You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2015/01/29 06:39:03 UTC
svn commit: r1655526 [5/26] - in /nutch/trunk: ./
src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/metadata/
src/java/org/apache/nutch/net/ src/java/org/apache/nutch/net/pr...
Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java Thu Jan 29 05:38:59 2015
@@ -43,28 +43,28 @@ import org.apache.nutch.parse.*;
import org.apache.nutch.scoring.ScoringFilters;
import org.apache.nutch.util.*;
-
/** The fetcher. Most of the work is done by plugins. */
-public class OldFetcher extends Configured implements Tool, MapRunnable<WritableComparable<?>, Writable, Text, NutchWritable> {
+public class OldFetcher extends Configured implements Tool,
+ MapRunnable<WritableComparable<?>, Writable, Text, NutchWritable> {
public static final Logger LOG = LoggerFactory.getLogger(OldFetcher.class);
-
+
public static final int PERM_REFRESH_TIME = 5;
public static final String CONTENT_REDIR = "content";
public static final String PROTOCOL_REDIR = "protocol";
- public static class InputFormat extends SequenceFileInputFormat<WritableComparable<?>, Writable> {
+ public static class InputFormat extends
+ SequenceFileInputFormat<WritableComparable<?>, Writable> {
/** Don't split inputs, to keep things polite. */
- public InputSplit[] getSplits(JobConf job, int nSplits)
- throws IOException {
+ public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
FileStatus[] files = listStatus(job);
InputSplit[] splits = new InputSplit[files.length];
for (int i = 0; i < files.length; i++) {
FileStatus cur = files[i];
- splits[i] = new FileSplit(cur.getPath(), 0,
- cur.getLen(), (String[])null);
+ splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+ (String[]) null);
}
return splits;
}
@@ -81,9 +81,9 @@ public class OldFetcher extends Configur
private long start = System.currentTimeMillis(); // start time of fetcher run
private long lastRequestStart = start;
- private long bytes; // total bytes fetched
- private int pages; // total pages fetched
- private int errors; // total pages errored
+ private long bytes; // total bytes fetched
+ private int pages; // total pages fetched
+ private int errors; // total pages errored
private boolean storingContent;
private boolean parsing;
@@ -100,8 +100,8 @@ public class OldFetcher extends Configur
private String reprUrl;
public FetcherThread(Configuration conf) {
- this.setDaemon(true); // don't hang JVM on exit
- this.setName("FetcherThread"); // use an informative name
+ this.setDaemon(true); // don't hang JVM on exit
+ this.setName("FetcherThread"); // use an informative name
this.conf = conf;
this.urlFilters = new URLFilters(conf);
this.scfilters = new ScoringFilters(conf);
@@ -112,26 +112,28 @@ public class OldFetcher extends Configur
@SuppressWarnings("fallthrough")
public void run() {
- synchronized (OldFetcher.this) {activeThreads++;} // count threads
-
+ synchronized (OldFetcher.this) {
+ activeThreads++;
+ } // count threads
+
try {
Text key = new Text();
CrawlDatum datum = new CrawlDatum();
-
+
while (true) {
// TODO : NUTCH-258 ...
// If something bad happened, then exit
// if (conf.getBoolean("fetcher.exit", false)) {
- // break;
+ // break;
// ]
-
- try { // get next entry from input
+
+ try { // get next entry from input
if (!input.next(key, datum)) {
- break; // at eof, exit
+ break; // at eof, exit
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:"+e.toString());
+ LOG.error("fetcher caught:" + e.toString());
}
break;
}
@@ -143,8 +145,8 @@ public class OldFetcher extends Configur
// url may be changed through redirects.
Text url = new Text(key);
- Text reprUrlWritable =
- (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+ Text reprUrlWritable = (Text) datum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
if (reprUrlWritable == null) {
reprUrl = key.toString();
} else {
@@ -152,7 +154,9 @@ public class OldFetcher extends Configur
}
try {
- if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("fetching " + url);
+ }
// fetch the page
redirectCount = 0;
@@ -161,7 +165,8 @@ public class OldFetcher extends Configur
LOG.debug("redirectCount=" + redirectCount);
}
redirecting = false;
- Protocol protocol = this.protocolFactory.getProtocol(url.toString());
+ Protocol protocol = this.protocolFactory.getProtocol(url
+ .toString());
ProtocolOutput output = protocol.getProtocolOutput(url, datum);
ProtocolStatus status = output.getStatus();
Content content = output.getContent();
@@ -173,22 +178,22 @@ public class OldFetcher extends Configur
new Text(reprUrl));
}
- switch(status.getCode()) {
+ switch (status.getCode()) {
- case ProtocolStatus.SUCCESS: // got a page
- pstatus = output(url, datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
+ case ProtocolStatus.SUCCESS: // got a page
+ pstatus = output(url, datum, content, status,
+ CrawlDatum.STATUS_FETCH_SUCCESS);
updateStatus(content.getContent().length);
- if (pstatus != null && pstatus.isSuccess() &&
- pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+ if (pstatus != null && pstatus.isSuccess()
+ && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
String newUrl = pstatus.getMessage();
int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
url = handleRedirect(url, datum, urlString, newUrl,
- refreshTime < PERM_REFRESH_TIME,
- CONTENT_REDIR);
+ refreshTime < PERM_REFRESH_TIME, CONTENT_REDIR);
}
break;
- case ProtocolStatus.MOVED: // redirect
+ case ProtocolStatus.MOVED: // redirect
case ProtocolStatus.TEMP_MOVED:
int code;
boolean temp;
@@ -201,22 +206,22 @@ public class OldFetcher extends Configur
}
output(url, datum, content, status, code);
String newUrl = status.getMessage();
- url = handleRedirect(url, datum, urlString, newUrl,
- temp, PROTOCOL_REDIR);
+ url = handleRedirect(url, datum, urlString, newUrl, temp,
+ PROTOCOL_REDIR);
break;
// failures - increase the retry counter
case ProtocolStatus.EXCEPTION:
logError(url, status.getMessage());
- /* FALLTHROUGH */
- case ProtocolStatus.RETRY: // retry
+ /* FALLTHROUGH */
+ case ProtocolStatus.RETRY: // retry
case ProtocolStatus.WOULDBLOCK:
case ProtocolStatus.BLOCKED:
output(url, datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
break;
-
+
// permanent failures
- case ProtocolStatus.GONE: // gone
+ case ProtocolStatus.GONE: // gone
case ProtocolStatus.NOTFOUND:
case ProtocolStatus.ACCESS_DENIED:
case ProtocolStatus.ROBOTS_DENIED:
@@ -224,9 +229,10 @@ public class OldFetcher extends Configur
break;
case ProtocolStatus.NOTMODIFIED:
- output(url, datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+ output(url, datum, null, status,
+ CrawlDatum.STATUS_FETCH_NOTMODIFIED);
break;
-
+
default:
if (LOG.isWarnEnabled()) {
LOG.warn("Unknown ProtocolStatus: " + status.getCode());
@@ -243,27 +249,27 @@ public class OldFetcher extends Configur
} while (redirecting && (redirectCount < maxRedirect));
-
- } catch (Throwable t) { // unexpected exception
+ } catch (Throwable t) { // unexpected exception
logError(url, t.toString());
output(url, datum, null, null, CrawlDatum.STATUS_FETCH_RETRY);
-
+
}
}
} catch (Throwable e) {
if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:"+e.toString());
+ LOG.error("fetcher caught:" + e.toString());
}
} finally {
- synchronized (OldFetcher.this) {activeThreads--;} // count threads
+ synchronized (OldFetcher.this) {
+ activeThreads--;
+ } // count threads
}
}
- private Text handleRedirect(Text url, CrawlDatum datum,
- String urlString, String newUrl,
- boolean temp, String redirType)
- throws MalformedURLException, URLFilterException {
+ private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+ String newUrl, boolean temp, String redirType)
+ throws MalformedURLException, URLFilterException {
newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
newUrl = urlFilters.filter(newUrl);
if (newUrl != null && !newUrl.equals(urlString)) {
@@ -273,8 +279,8 @@ public class OldFetcher extends Configur
redirecting = true;
redirectCount++;
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching now)");
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching now)");
}
return url;
} else {
@@ -285,15 +291,15 @@ public class OldFetcher extends Configur
}
output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect to " +
- url + " (fetching later)");
+ LOG.debug(" - " + redirType + " redirect to " + url
+ + " (fetching later)");
}
return null;
}
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(" - " + redirType + " redirect skipped: " +
- (newUrl != null ? "to same url" : "filtered"));
+ LOG.debug(" - " + redirType + " redirect skipped: "
+ + (newUrl != null ? "to same url" : "filtered"));
}
return null;
}
@@ -303,17 +309,18 @@ public class OldFetcher extends Configur
if (LOG.isInfoEnabled()) {
LOG.info("fetch of " + url + " failed with: " + message);
}
- synchronized (OldFetcher.this) { // record failure
+ synchronized (OldFetcher.this) { // record failure
errors++;
}
}
- private ParseStatus output(Text key, CrawlDatum datum,
- Content content, ProtocolStatus pstatus, int status) {
+ private ParseStatus output(Text key, CrawlDatum datum, Content content,
+ ProtocolStatus pstatus, int status) {
datum.setStatus(status);
datum.setFetchTime(System.currentTimeMillis());
- if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+ if (pstatus != null)
+ datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
ParseResult parseResult = null;
if (content != null) {
@@ -328,27 +335,31 @@ public class OldFetcher extends Configur
LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
}
}
- /* Note: Fetcher will only follow meta-redirects coming from the
- * original URL. */
+ /*
+ * Note: Fetcher will only follow meta-redirects coming from the
+ * original URL.
+ */
if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
try {
parseResult = this.parseUtil.parse(content);
} catch (Exception e) {
- LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
+ LOG.warn("Error parsing: " + key + ": "
+ + StringUtils.stringifyException(e));
}
if (parseResult == null) {
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content,
- new ParseStatus().getEmptyParse(conf));
+ byte[] signature = SignatureFactory.getSignature(getConf())
+ .calculate(content, new ParseStatus().getEmptyParse(conf));
datum.setSignature(signature);
}
}
-
- /* Store status code in content So we can read this value during
- * parsing (as a separate job) and decide to parse or not.
+
+ /*
+ * Store status code in content So we can read this value during parsing
+ * (as a separate job) and decide to parse or not.
*/
- content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
+ content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+ Integer.toString(status));
}
try {
@@ -360,7 +371,7 @@ public class OldFetcher extends Configur
Text url = entry.getKey();
Parse parse = entry.getValue();
ParseStatus parseStatus = parse.getData().getStatus();
-
+
if (!parseStatus.isSuccess()) {
LOG.warn("Error parsing: " + key + ": " + parseStatus);
parse = parseStatus.getEmptyParse(getConf());
@@ -368,16 +379,16 @@ public class OldFetcher extends Configur
// Calculate page signature. For non-parsing fetchers this will
// be done in ParseSegment
- byte[] signature =
- SignatureFactory.getSignature(getConf()).calculate(content, parse);
+ byte[] signature = SignatureFactory.getSignature(getConf())
+ .calculate(content, parse);
// Ensure segment name and score are in parseData metadata
- parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY,
- segmentName);
- parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY,
- StringUtil.toHexString(signature));
+ parse.getData().getContentMeta()
+ .set(Nutch.SEGMENT_NAME_KEY, segmentName);
+ parse.getData().getContentMeta()
+ .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
// Pass fetch time to content meta
- parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
- Long.toString(datum.getFetchTime()));
+ parse.getData().getContentMeta()
+ .set(Nutch.FETCH_TIME_KEY, Long.toString(datum.getFetchTime()));
if (url.equals(key))
datum.setSignature(signature);
try {
@@ -387,14 +398,13 @@ public class OldFetcher extends Configur
LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
}
}
- output.collect(url, new NutchWritable(
- new ParseImpl(new ParseText(parse.getText()),
- parse.getData(), parse.isCanonical())));
+ output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+ parse.getText()), parse.getData(), parse.isCanonical())));
}
}
} catch (IOException e) {
if (LOG.isErrorEnabled()) {
- LOG.error("fetcher caught:"+e.toString());
+ LOG.error("fetcher caught:" + e.toString());
}
}
@@ -404,10 +414,10 @@ public class OldFetcher extends Configur
if (p != null) {
return p.getData().getStatus();
}
- }
+ }
return null;
}
-
+
}
private synchronized void updateStatus(int bytesInPage) throws IOException {
@@ -418,23 +428,22 @@ public class OldFetcher extends Configur
private void reportStatus() throws IOException {
String status;
synchronized (this) {
- long elapsed = (System.currentTimeMillis() - start)/1000;
- status =
- pages+" pages, "+errors+" errors, "
- + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
- + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
+ long elapsed = (System.currentTimeMillis() - start) / 1000;
+ status = pages + " pages, " + errors + " errors, "
+ + Math.round(((float) pages * 10) / elapsed) / 10.0 + " pages/s, "
+ + Math.round(((((float) bytes) * 8) / 1024) / elapsed) + " kb/s, ";
}
reporter.setStatus(status);
}
public OldFetcher() {
-
+
}
-
+
public OldFetcher(Configuration conf) {
setConf(conf);
}
-
+
public void configure(JobConf job) {
setConf(job);
@@ -442,12 +451,13 @@ public class OldFetcher extends Configur
this.storingContent = isStoringContent(job);
this.parsing = isParsing(job);
-// if (job.getBoolean("fetcher.verbose", false)) {
-// LOG.setLevel(Level.FINE);
-// }
+ // if (job.getBoolean("fetcher.verbose", false)) {
+ // LOG.setLevel(Level.FINE);
+ // }
}
- public void close() {}
+ public void close() {
+ }
public static boolean isParsing(Configuration conf) {
return conf.getBoolean("fetcher.parse", true);
@@ -457,29 +467,33 @@ public class OldFetcher extends Configur
return conf.getBoolean("fetcher.store.content", true);
}
- public void run(RecordReader<WritableComparable<?>, Writable> input, OutputCollector<Text, NutchWritable> output,
- Reporter reporter) throws IOException {
+ public void run(RecordReader<WritableComparable<?>, Writable> input,
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
this.input = input;
this.output = output;
this.reporter = reporter;
this.maxRedirect = getConf().getInt("http.redirect.max", 3);
-
+
int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
- if (LOG.isInfoEnabled()) { LOG.info("OldFetcher: threads: " + threadCount); }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("OldFetcher: threads: " + threadCount);
+ }
- for (int i = 0; i < threadCount; i++) { // spawn threads
+ for (int i = 0; i < threadCount; i++) { // spawn threads
new FetcherThread(getConf()).start();
}
// select a timeout that avoids a task timeout
- long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+ long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000) / 2;
- do { // wait for threads to exit
+ do { // wait for threads to exit
try {
Thread.sleep(1000);
- } catch (InterruptedException e) {}
+ } catch (InterruptedException e) {
+ }
reportStatus();
@@ -487,18 +501,17 @@ public class OldFetcher extends Configur
synchronized (this) {
if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Aborting with "+activeThreads+" hung threads.");
+ LOG.warn("Aborting with " + activeThreads + " hung threads.");
}
return;
}
}
} while (activeThreads > 0);
-
+
}
- public void fetch(Path segment, int threads)
- throws IOException {
+ public void fetch(Path segment, int threads) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
@@ -516,7 +529,8 @@ public class OldFetcher extends Configur
// for politeness, don't permit parallel execution of a single task
job.setSpeculativeExecution(false);
- FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.GENERATE_DIR_NAME));
job.setInputFormat(InputFormat.class);
job.setMapRunnerClass(OldFetcher.class);
@@ -528,16 +542,17 @@ public class OldFetcher extends Configur
JobClient.runJob(job);
long end = System.currentTimeMillis();
- LOG.info("OldFetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+ LOG.info("OldFetcher: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
}
-
/** Run the fetcher. */
public static void main(String[] args) throws Exception {
- int res = ToolRunner.run(NutchConfiguration.create(), new OldFetcher(), args);
+ int res = ToolRunner.run(NutchConfiguration.create(), new OldFetcher(),
+ args);
System.exit(res);
}
-
+
public int run(String[] args) throws Exception {
String usage = "Usage: OldFetcher <segment> [-threads n] [-noParsing]";
@@ -546,15 +561,16 @@ public class OldFetcher extends Configur
System.err.println(usage);
return -1;
}
-
+
Path segment = new Path(args[0]);
int threads = getConf().getInt("fetcher.threads.fetch", 10);
boolean parsing = true;
- for (int i = 1; i < args.length; i++) { // parse command line
- if (args[i].equals("-threads")) { // found -threads option
- threads = Integer.parseInt(args[++i]);
- } else if (args[i].equals("-noParsing")) parsing = false;
+ for (int i = 1; i < args.length; i++) { // parse command line
+ if (args[i].equals("-threads")) { // found -threads option
+ threads = Integer.parseInt(args[++i]);
+ } else if (args[i].equals("-noParsing"))
+ parsing = false;
}
getConf().setInt("fetcher.threads.fetch", threads);
@@ -562,7 +578,7 @@ public class OldFetcher extends Configur
getConf().setBoolean("fetcher.parse", parsing);
}
try {
- fetch(segment, threads); // run the Fetcher
+ fetch(segment, threads); // run the Fetcher
return 0;
} catch (Exception e) {
LOG.error("OldFetcher: " + StringUtils.stringifyException(e));
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java Thu Jan 29 05:38:59 2015
@@ -45,169 +45,166 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * The class scans CrawlDB looking for entries with status DB_GONE (404) or
- * DB_DUPLICATE and
- * sends delete requests to indexers for those documents.
+ * The class scans CrawlDB looking for entries with status DB_GONE (404) or
+ * DB_DUPLICATE and sends delete requests to indexers for those documents.
*/
public class CleaningJob implements Tool {
- public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
- private Configuration conf;
+ public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
+ private Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public static class DBFilter implements
+ Mapper<Text, CrawlDatum, ByteWritable, Text> {
+ private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
+
+ @Override
+ public void configure(JobConf arg0) {
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
@Override
- public Configuration getConf() {
- return conf;
+ public void map(Text key, CrawlDatum value,
+ OutputCollector<ByteWritable, Text> output, Reporter reporter)
+ throws IOException {
+
+ if (value.getStatus() == CrawlDatum.STATUS_DB_GONE
+ || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+ output.collect(OUT, key);
+ }
+ }
+ }
+
+ public static class DeleterReducer implements
+ Reducer<ByteWritable, Text, Text, ByteWritable> {
+ private static final int NUM_MAX_DELETE_REQUEST = 1000;
+ private int numDeletes = 0;
+ private int totalDeleted = 0;
+
+ private boolean noCommit = false;
+
+ IndexWriters writers = null;
+
+ @Override
+ public void configure(JobConf job) {
+ writers = new IndexWriters(job);
+ try {
+ writers.open(job, "Deletion");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ noCommit = job.getBoolean("noCommit", false);
}
@Override
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public static class DBFilter implements
- Mapper<Text, CrawlDatum, ByteWritable, Text> {
- private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
-
- @Override
- public void configure(JobConf arg0) {
- }
-
- @Override
- public void close() throws IOException {
- }
-
- @Override
- public void map(Text key, CrawlDatum value,
- OutputCollector<ByteWritable, Text> output, Reporter reporter)
- throws IOException {
-
- if (value.getStatus() == CrawlDatum.STATUS_DB_GONE || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
- output.collect(OUT, key);
- }
- }
- }
-
- public static class DeleterReducer implements
- Reducer<ByteWritable, Text, Text, ByteWritable> {
- private static final int NUM_MAX_DELETE_REQUEST = 1000;
- private int numDeletes = 0;
- private int totalDeleted = 0;
-
- private boolean noCommit = false;
-
- IndexWriters writers = null;
-
- @Override
- public void configure(JobConf job) {
- writers = new IndexWriters(job);
- try {
- writers.open(job, "Deletion");
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- noCommit = job.getBoolean("noCommit", false);
- }
-
- @Override
- public void close() throws IOException {
- // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
- // if (numDeletes > 0) {
- // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
- // // TODO updateRequest.process(solr);
- // totalDeleted += numDeletes;
- // }
-
- writers.close();
-
- if (totalDeleted > 0 && !noCommit) {
- writers.commit();
- }
-
- LOG.info("CleaningJob: deleted a total of " + totalDeleted
- + " documents");
- }
-
- @Override
- public void reduce(ByteWritable key, Iterator<Text> values,
- OutputCollector<Text, ByteWritable> output, Reporter reporter)
- throws IOException {
- while (values.hasNext()) {
- Text document = values.next();
- writers.delete(document.toString());
- totalDeleted++;
- reporter.incrCounter("CleaningJobStatus", "Deleted documents",
- 1);
- // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
- // LOG.info("CleaningJob: deleting " + numDeletes
- // + " documents");
- // // TODO updateRequest.process(solr);
- // // TODO updateRequest = new UpdateRequest();
- // writers.delete(key.toString());
- // totalDeleted += numDeletes;
- // numDeletes = 0;
- // }
- }
- }
- }
-
- public void delete(String crawldb, boolean noCommit) throws IOException {
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- long start = System.currentTimeMillis();
- LOG.info("CleaningJob: starting at " + sdf.format(start));
-
- JobConf job = new NutchJob(getConf());
-
- FileInputFormat.addInputPath(job, new Path(crawldb,
- CrawlDb.CURRENT_NAME));
- job.setBoolean("noCommit", noCommit);
- job.setInputFormat(SequenceFileInputFormat.class);
- job.setOutputFormat(NullOutputFormat.class);
- job.setMapOutputKeyClass(ByteWritable.class);
- job.setMapOutputValueClass(Text.class);
- job.setMapperClass(DBFilter.class);
- job.setReducerClass(DeleterReducer.class);
-
- job.setJobName("CleaningJob");
-
- // need to expicitely allow deletions
- job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
-
- JobClient.runJob(job);
-
- long end = System.currentTimeMillis();
- LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
- + TimingUtil.elapsedTime(start, end));
- }
-
- public int run(String[] args) throws IOException {
- if (args.length < 1) {
- String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
- LOG.error("Missing crawldb. "+usage);
- System.err.println(usage);
- IndexWriters writers = new IndexWriters(getConf());
- System.err.println(writers.describe());
- return 1;
- }
-
- boolean noCommit = false;
- if (args.length == 2 && args[1].equals("-noCommit")) {
- noCommit = true;
- }
-
- try {
- delete(args[0], noCommit);
- } catch (final Exception e) {
- LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
- System.err.println("ERROR CleaningJob: "
- + StringUtils.stringifyException(e));
- return -1;
- }
- return 0;
- }
-
- public static void main(String[] args) throws Exception {
- int result = ToolRunner.run(NutchConfiguration.create(),
- new CleaningJob(), args);
- System.exit(result);
+ public void close() throws IOException {
+ // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
+ // if (numDeletes > 0) {
+ // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
+ // // TODO updateRequest.process(solr);
+ // totalDeleted += numDeletes;
+ // }
+
+ writers.close();
+
+ if (totalDeleted > 0 && !noCommit) {
+ writers.commit();
+ }
+
+ LOG.info("CleaningJob: deleted a total of " + totalDeleted + " documents");
}
+
+ @Override
+ public void reduce(ByteWritable key, Iterator<Text> values,
+ OutputCollector<Text, ByteWritable> output, Reporter reporter)
+ throws IOException {
+ while (values.hasNext()) {
+ Text document = values.next();
+ writers.delete(document.toString());
+ totalDeleted++;
+ reporter.incrCounter("CleaningJobStatus", "Deleted documents", 1);
+ // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+ // LOG.info("CleaningJob: deleting " + numDeletes
+ // + " documents");
+ // // TODO updateRequest.process(solr);
+ // // TODO updateRequest = new UpdateRequest();
+ // writers.delete(key.toString());
+ // totalDeleted += numDeletes;
+ // numDeletes = 0;
+ // }
+ }
+ }
+ }
+
+ public void delete(String crawldb, boolean noCommit) throws IOException {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("CleaningJob: starting at " + sdf.format(start));
+
+ JobConf job = new NutchJob(getConf());
+
+ FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+ job.setBoolean("noCommit", noCommit);
+ job.setInputFormat(SequenceFileInputFormat.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setMapOutputKeyClass(ByteWritable.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setMapperClass(DBFilter.class);
+ job.setReducerClass(DeleterReducer.class);
+
+ job.setJobName("CleaningJob");
+
+ // need to expicitely allow deletions
+ job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
+
+ JobClient.runJob(job);
+
+ long end = System.currentTimeMillis();
+ LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ }
+
+ public int run(String[] args) throws IOException {
+ if (args.length < 1) {
+ String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
+ LOG.error("Missing crawldb. " + usage);
+ System.err.println(usage);
+ IndexWriters writers = new IndexWriters(getConf());
+ System.err.println(writers.describe());
+ return 1;
+ }
+
+ boolean noCommit = false;
+ if (args.length == 2 && args[1].equals("-noCommit")) {
+ noCommit = true;
+ }
+
+ try {
+ delete(args[0], noCommit);
+ } catch (final Exception e) {
+ LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
+ System.err.println("ERROR CleaningJob: "
+ + StringUtils.stringifyException(e));
+ return -1;
+ }
+ return 0;
+ }
+
+ public static void main(String[] args) throws Exception {
+ int result = ToolRunner.run(NutchConfiguration.create(), new CleaningJob(),
+ args);
+ System.exit(result);
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java Thu Jan 29 05:38:59 2015
@@ -24,22 +24,24 @@ import org.apache.nutch.indexer.NutchDoc
import org.apache.nutch.plugin.Pluggable;
public interface IndexWriter extends Pluggable, Configurable {
- /** The name of the extension point. */
- final static String X_POINT_ID = IndexWriter.class.getName();
-
- public void open(JobConf job, String name) throws IOException;
-
- public void write(NutchDocument doc) throws IOException;
-
- public void delete(String key) throws IOException;
-
- public void update(NutchDocument doc) throws IOException;
-
- public void commit() throws IOException;
-
- public void close() throws IOException;
+ /** The name of the extension point. */
+ final static String X_POINT_ID = IndexWriter.class.getName();
- /** Returns a String describing the IndexWriter instance and the specific parameters it can take */
- public String describe();
-}
+ public void open(JobConf job, String name) throws IOException;
+
+ public void write(NutchDocument doc) throws IOException;
+
+ public void delete(String key) throws IOException;
+
+ public void update(NutchDocument doc) throws IOException;
+ public void commit() throws IOException;
+
+ public void close() throws IOException;
+
+ /**
+ * Returns a String describing the IndexWriter instance and the specific
+ * parameters it can take
+ */
+ public String describe();
+}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java Thu Jan 29 05:38:59 2015
@@ -33,116 +33,113 @@ import org.slf4j.LoggerFactory;
/** Creates and caches {@link IndexWriter} implementing plugins. */
public class IndexWriters {
- public final static Logger LOG = LoggerFactory
- .getLogger(IndexWriters.class);
+ public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class);
- private IndexWriter[] indexWriters;
+ private IndexWriter[] indexWriters;
- public IndexWriters(Configuration conf) {
- ObjectCache objectCache = ObjectCache.get(conf);
- synchronized (objectCache) {
- this.indexWriters = (IndexWriter[]) objectCache
- .getObject(IndexWriter.class.getName());
- if (this.indexWriters == null) {
- try {
- ExtensionPoint point = PluginRepository.get(conf)
- .getExtensionPoint(IndexWriter.X_POINT_ID);
- if (point == null)
- throw new RuntimeException(IndexWriter.X_POINT_ID
- + " not found.");
- Extension[] extensions = point.getExtensions();
- HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
- for (int i = 0; i < extensions.length; i++) {
- Extension extension = extensions[i];
- IndexWriter writer = (IndexWriter) extension
- .getExtensionInstance();
- LOG.info("Adding " + writer.getClass().getName());
- if (!indexerMap.containsKey(writer.getClass().getName())) {
- indexerMap.put(writer.getClass().getName(), writer);
- }
- }
- objectCache.setObject(IndexWriter.class.getName(), indexerMap
- .values().toArray(new IndexWriter[0]));
- } catch (PluginRuntimeException e) {
- throw new RuntimeException(e);
- }
- this.indexWriters = (IndexWriter[]) objectCache
- .getObject(IndexWriter.class.getName());
- }
- }
- }
-
- public void open(JobConf job, String name) throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].open(job, name);
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- public void write(NutchDocument doc) throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].write(doc);
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- public void update(NutchDocument doc) throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].update(doc);
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- public void delete(String key) throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].delete(key);
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- public void close() throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].close();
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- public void commit() throws IOException {
- for (int i = 0; i < this.indexWriters.length; i++) {
- try {
- this.indexWriters[i].commit();
- } catch (IOException ioe) {
- throw ioe;
- }
- }
- }
-
- // lists the active IndexWriters and their configuration
- public String describe() throws IOException {
- StringBuffer buffer = new StringBuffer();
- if (this.indexWriters.length == 0)
- buffer.append("No IndexWriters activated - check your configuration\n");
- else
- buffer.append("Active IndexWriters :\n");
- for (int i = 0; i < this.indexWriters.length; i++) {
- buffer.append(this.indexWriters[i].describe()).append("\n");
- }
- return buffer.toString();
- }
+ public IndexWriters(Configuration conf) {
+ ObjectCache objectCache = ObjectCache.get(conf);
+ synchronized (objectCache) {
+ this.indexWriters = (IndexWriter[]) objectCache
+ .getObject(IndexWriter.class.getName());
+ if (this.indexWriters == null) {
+ try {
+ ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+ IndexWriter.X_POINT_ID);
+ if (point == null)
+ throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
+ Extension[] extensions = point.getExtensions();
+ HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
+ for (int i = 0; i < extensions.length; i++) {
+ Extension extension = extensions[i];
+ IndexWriter writer = (IndexWriter) extension.getExtensionInstance();
+ LOG.info("Adding " + writer.getClass().getName());
+ if (!indexerMap.containsKey(writer.getClass().getName())) {
+ indexerMap.put(writer.getClass().getName(), writer);
+ }
+ }
+ objectCache.setObject(IndexWriter.class.getName(), indexerMap
+ .values().toArray(new IndexWriter[0]));
+ } catch (PluginRuntimeException e) {
+ throw new RuntimeException(e);
+ }
+ this.indexWriters = (IndexWriter[]) objectCache
+ .getObject(IndexWriter.class.getName());
+ }
+ }
+ }
+
+ public void open(JobConf job, String name) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].open(job, name);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void write(NutchDocument doc) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].write(doc);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void update(NutchDocument doc) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].update(doc);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void delete(String key) throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].delete(key);
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].close();
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ public void commit() throws IOException {
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ try {
+ this.indexWriters[i].commit();
+ } catch (IOException ioe) {
+ throw ioe;
+ }
+ }
+ }
+
+ // lists the active IndexWriters and their configuration
+ public String describe() throws IOException {
+ StringBuffer buffer = new StringBuffer();
+ if (this.indexWriters.length == 0)
+ buffer.append("No IndexWriters activated - check your configuration\n");
+ else
+ buffer.append("Active IndexWriters :\n");
+ for (int i = 0; i < this.indexWriters.length; i++) {
+ buffer.append(this.indexWriters[i].describe()).append("\n");
+ }
+ return buffer.toString();
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java Thu Jan 29 05:38:59 2015
@@ -50,11 +50,12 @@ import org.apache.nutch.parse.ParseText;
import org.apache.nutch.scoring.ScoringFilterException;
import org.apache.nutch.scoring.ScoringFilters;
-public class IndexerMapReduce extends Configured
-implements Mapper<Text, Writable, Text, NutchWritable>,
- Reducer<Text, NutchWritable, Text, NutchIndexAction> {
+public class IndexerMapReduce extends Configured implements
+ Mapper<Text, Writable, Text, NutchWritable>,
+ Reducer<Text, NutchWritable, Text, NutchIndexAction> {
- public static final Logger LOG = LoggerFactory.getLogger(IndexerMapReduce.class);
+ public static final Logger LOG = LoggerFactory
+ .getLogger(IndexerMapReduce.class);
public static final String INDEXER_PARAMS = "indexer.additional.params";
public static final String INDEXER_DELETE = "indexer.delete";
@@ -82,14 +83,16 @@ implements Mapper<Text, Writable, Text,
this.filters = new IndexingFilters(getConf());
this.scfilters = new ScoringFilters(getConf());
this.delete = job.getBoolean(INDEXER_DELETE, false);
- this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX, false);
+ this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX,
+ false);
this.skip = job.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
normalize = job.getBoolean(URL_NORMALIZING, false);
filter = job.getBoolean(URL_FILTERING, false);
if (normalize) {
- urlNormalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_INDEXER);
+ urlNormalizers = new URLNormalizers(getConf(),
+ URLNormalizers.SCOPE_INDEXER);
}
if (filter) {
@@ -99,9 +102,10 @@ implements Mapper<Text, Writable, Text,
/**
* Normalizes and trims extra whitespace from the given url.
- *
- * @param url The url to normalize.
- *
+ *
+ * @param url
+ * The url to normalize.
+ *
* @return The normalized url.
*/
private String normalizeUrl(String url) {
@@ -114,11 +118,10 @@ implements Mapper<Text, Writable, Text,
try {
// normalize and trim the url
- normalized = urlNormalizers.normalize(url,
- URLNormalizers.SCOPE_INDEXER);
+ normalized = urlNormalizers
+ .normalize(url, URLNormalizers.SCOPE_INDEXER);
normalized = normalized.trim();
- }
- catch (Exception e) {
+ } catch (Exception e) {
LOG.warn("Skipping " + url + ":" + e);
normalized = null;
}
@@ -129,9 +132,10 @@ implements Mapper<Text, Writable, Text,
/**
* Filters the given url.
- *
- * @param url The url to filter.
- *
+ *
+ * @param url
+ * The url to filter.
+ *
* @return The filtered url or null.
*/
private String filterUrl(String url) {
@@ -149,7 +153,8 @@ implements Mapper<Text, Writable, Text,
}
public void map(Text key, Writable value,
- OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException {
+ OutputCollector<Text, NutchWritable> output, Reporter reporter)
+ throws IOException {
String urlString = filterUrl(normalizeUrl(key.toString()));
if (urlString == null) {
@@ -162,8 +167,8 @@ implements Mapper<Text, Writable, Text,
}
public void reduce(Text key, Iterator<NutchWritable> values,
- OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
- throws IOException {
+ OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
+ throws IOException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
CrawlDatum fetchDatum = null;
@@ -173,26 +178,25 @@ implements Mapper<Text, Writable, Text,
while (values.hasNext()) {
final Writable value = values.next().get(); // unwrap
if (value instanceof Inlinks) {
- inlinks = (Inlinks)value;
+ inlinks = (Inlinks) value;
} else if (value instanceof CrawlDatum) {
- final CrawlDatum datum = (CrawlDatum)value;
+ final CrawlDatum datum = (CrawlDatum) value;
if (CrawlDatum.hasDbStatus(datum)) {
dbDatum = datum;
- }
- else if (CrawlDatum.hasFetchStatus(datum)) {
+ } else if (CrawlDatum.hasFetchStatus(datum)) {
// don't index unmodified (empty) pages
if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
fetchDatum = datum;
}
- } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
- CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
- CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
+ } else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
+ || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
+ || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
continue;
} else {
- throw new RuntimeException("Unexpected status: "+datum.getStatus());
+ throw new RuntimeException("Unexpected status: " + datum.getStatus());
}
} else if (value instanceof ParseData) {
- parseData = (ParseData)value;
+ parseData = (ParseData) value;
// Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
if (deleteRobotsNoIndex) {
@@ -200,64 +204,70 @@ implements Mapper<Text, Writable, Text,
String robotsMeta = parseData.getMeta("robots");
// Has it a noindex for this url?
- if (robotsMeta != null && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
+ if (robotsMeta != null
+ && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
// Delete it!
- NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ NutchIndexAction action = new NutchIndexAction(null,
+ NutchIndexAction.DELETE);
output.collect(key, action);
return;
}
}
} else if (value instanceof ParseText) {
- parseText = (ParseText)value;
+ parseText = (ParseText) value;
} else if (LOG.isWarnEnabled()) {
- LOG.warn("Unrecognized type: "+value.getClass());
+ LOG.warn("Unrecognized type: " + value.getClass());
}
}
-
+
// Whether to delete GONE or REDIRECTS
- if (delete && fetchDatum != null && dbDatum != null) {
- if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
+ if (delete && fetchDatum != null && dbDatum != null) {
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
reporter.incrCounter("IndexerStatus", "Documents deleted", 1);
- NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ NutchIndexAction action = new NutchIndexAction(null,
+ NutchIndexAction.DELETE);
output.collect(key, action);
return;
}
-
- if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM ||
- fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP ||
- dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
- dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+ || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
+ || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
reporter.incrCounter("IndexerStatus", "Deleted redirects", 1);
reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);
- NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ NutchIndexAction action = new NutchIndexAction(null,
+ NutchIndexAction.DELETE);
output.collect(key, action);
return;
}
}
- if (fetchDatum == null || dbDatum == null
- || parseText == null || parseData == null) {
- return; // only have inlinks
+ if (fetchDatum == null || dbDatum == null || parseText == null
+ || parseData == null) {
+ return; // only have inlinks
}
// Whether to delete pages marked as duplicates
if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
reporter.incrCounter("IndexerStatus", "Duplicates deleted", 1);
- NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ NutchIndexAction action = new NutchIndexAction(null,
+ NutchIndexAction.DELETE);
output.collect(key, action);
return;
}
-
+
// Whether to skip DB_NOTMODIFIED pages
if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
reporter.incrCounter("IndexerStatus", "Skipped", 1);
return;
}
- if (!parseData.getStatus().isSuccess() ||
- fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
+ if (!parseData.getStatus().isSuccess()
+ || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
return;
}
@@ -276,11 +286,13 @@ implements Mapper<Text, Writable, Text,
try {
// extract information from dbDatum and pass it to
// fetchDatum so that indexing filters can use it
- final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+ final Text url = (Text) dbDatum.getMetaData().get(
+ Nutch.WRITABLE_REPR_URL_KEY);
if (url != null) {
// Representation URL also needs normalization and filtering.
// If repr URL is excluded by filters we still accept this document
- // but represented by its primary URL ("key") which has passed URL filters.
+ // but represented by its primary URL ("key") which has passed URL
+ // filters.
String urlString = filterUrl(normalizeUrl(url.toString()));
if (urlString != null) {
url.set(urlString);
@@ -290,7 +302,9 @@ implements Mapper<Text, Writable, Text,
// run indexing filters
doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
} catch (final IndexingException e) {
- if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Error indexing " + key + ": " + e);
+ }
reporter.incrCounter("IndexerStatus", "Errors", 1);
return;
}
@@ -304,8 +318,8 @@ implements Mapper<Text, Writable, Text,
float boost = 1.0f;
// run scoring filters
try {
- boost = this.scfilters.indexerScore(key, doc, dbDatum,
- fetchDatum, parse, inlinks, boost);
+ boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
+ inlinks, boost);
} catch (final ScoringFilterException e) {
if (LOG.isWarnEnabled()) {
LOG.warn("Error calculating score " + key + ": " + e);
@@ -323,30 +337,32 @@ implements Mapper<Text, Writable, Text,
output.collect(key, action);
}
- public void close() throws IOException { }
+ public void close() throws IOException {
+ }
public static void initMRJob(Path crawlDb, Path linkDb,
- Collection<Path> segments,
- JobConf job) {
+ Collection<Path> segments, JobConf job) {
LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
-
- if (linkDb!=null)
+
+ if (linkDb != null)
LOG.info("IndexerMapReduce: linkdb: " + linkDb);
for (final Path segment : segments) {
LOG.info("IndexerMapReduces: adding segment: " + segment);
- FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));
- FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.FETCH_DIR_NAME));
+ FileInputFormat.addInputPath(job, new Path(segment,
+ CrawlDatum.PARSE_DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
}
FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-
- if (linkDb!=null)
- FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
-
+
+ if (linkDb != null)
+ FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
+
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(IndexerMapReduce.class);
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Thu Jan 29 05:38:59 2015
@@ -27,31 +27,31 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.Progressable;
public class IndexerOutputFormat extends
- FileOutputFormat<Text, NutchIndexAction> {
+ FileOutputFormat<Text, NutchIndexAction> {
- @Override
- public RecordWriter<Text, NutchIndexAction> getRecordWriter(
- FileSystem ignored, JobConf job, String name, Progressable progress)
- throws IOException {
-
- final IndexWriters writers = new IndexWriters(job);
-
- writers.open(job, name);
-
- return new RecordWriter<Text, NutchIndexAction>() {
-
- public void close(Reporter reporter) throws IOException {
- writers.close();
- }
-
- public void write(Text key, NutchIndexAction indexAction)
- throws IOException {
- if (indexAction.action == NutchIndexAction.ADD) {
- writers.write(indexAction.doc);
- } else if (indexAction.action == NutchIndexAction.DELETE) {
- writers.delete(key.toString());
- }
- }
- };
- }
+ @Override
+ public RecordWriter<Text, NutchIndexAction> getRecordWriter(
+ FileSystem ignored, JobConf job, String name, Progressable progress)
+ throws IOException {
+
+ final IndexWriters writers = new IndexWriters(job);
+
+ writers.open(job, name);
+
+ return new RecordWriter<Text, NutchIndexAction>() {
+
+ public void close(Reporter reporter) throws IOException {
+ writers.close();
+ }
+
+ public void write(Text key, NutchIndexAction indexAction)
+ throws IOException {
+ if (indexAction.action == NutchIndexAction.ADD) {
+ writers.write(indexAction.doc);
+ } else if (indexAction.action == NutchIndexAction.DELETE) {
+ writers.delete(key.toString());
+ }
+ }
+ };
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java Thu Jan 29 05:38:59 2015
@@ -28,9 +28,9 @@ import org.apache.nutch.crawl.CrawlDatum
import org.apache.nutch.crawl.Inlinks;
import org.apache.nutch.plugin.Pluggable;
-
-/** Extension point for indexing. Permits one to add metadata to the indexed
- * fields. All plugins found which implement this extension point are run
+/**
+ * Extension point for indexing. Permits one to add metadata to the indexed
+ * fields. All plugins found which implement this extension point are run
* sequentially on the parse.
*/
public interface IndexingFilter extends Pluggable, Configurable {
@@ -57,6 +57,6 @@ public interface IndexingFilter extends
* document should be discarded)
* @throws IndexingException
*/
- NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
- throws IndexingException;
+ NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+ CrawlDatum datum, Inlinks inlinks) throws IndexingException;
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java Thu Jan 29 05:38:59 2015
@@ -28,12 +28,13 @@ import org.apache.nutch.crawl.CrawlDatum
import org.apache.nutch.crawl.Inlinks;
import org.apache.hadoop.io.Text;
-/** Creates and caches {@link IndexingFilter} implementing plugins.*/
+/** Creates and caches {@link IndexingFilter} implementing plugins. */
public class IndexingFilters {
public static final String INDEXINGFILTER_ORDER = "indexingfilter.order";
- public final static Logger LOG = LoggerFactory.getLogger(IndexingFilters.class);
+ public final static Logger LOG = LoggerFactory
+ .getLogger(IndexingFilters.class);
private IndexingFilter[] indexingFilters;
@@ -44,12 +45,13 @@ public class IndexingFilters {
}
/** Run all defined filters. */
- public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,
- Inlinks inlinks) throws IndexingException {
+ public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+ CrawlDatum datum, Inlinks inlinks) throws IndexingException {
for (int i = 0; i < this.indexingFilters.length; i++) {
doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
// break the loop if an indexing filter discards the doc
- if (doc == null) return null;
+ if (doc == null)
+ return null;
}
return doc;
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Thu Jan 29 05:38:59 2015
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
+
package org.apache.nutch.indexer;
import java.util.List;
@@ -46,16 +46,19 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Reads and parses a URL and run the indexers on it. Displays the fields obtained and the first
- * 100 characters of their value
- *
- * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker http://www.lemonde.fr
+ * Reads and parses a URL and run the indexers on it. Displays the fields
+ * obtained and the first 100 characters of their value
+ *
+ * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker
+ * http://www.lemonde.fr
+ *
* @author Julien Nioche
**/
public class IndexingFiltersChecker extends Configured implements Tool {
- public static final Logger LOG = LoggerFactory.getLogger(IndexingFiltersChecker.class);
+ public static final Logger LOG = LoggerFactory
+ .getLogger(IndexingFiltersChecker.class);
public IndexingFiltersChecker() {
@@ -95,12 +98,13 @@ public class IndexingFiltersChecker exte
CrawlDatum datum = new CrawlDatum();
ProtocolOutput output = protocol.getProtocolOutput(new Text(url), datum);
-
+
if (!output.getStatus().isSuccess()) {
- System.out.println("Fetch failed with protocol status: " + output.getStatus());
+ System.out.println("Fetch failed with protocol status: "
+ + output.getStatus());
return 0;
}
-
+
Content content = output.getContent();
if (content == null) {
@@ -115,7 +119,8 @@ public class IndexingFiltersChecker exte
}
// store the guessed content type in the crawldatum
- datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(contentType));
+ datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+ new Text(contentType));
if (ParseSegment.isTruncated(content)) {
LOG.warn("Content is truncated, parse may fail!");
@@ -162,7 +167,7 @@ public class IndexingFiltersChecker exte
System.out.println("Document discarded by indexing filter");
return 0;
}
-
+
for (String fname : doc.getFieldNames()) {
List<Object> values = doc.getField(fname).getValues();
if (values != null) {
@@ -173,14 +178,14 @@ public class IndexingFiltersChecker exte
}
}
}
-
- if (conf.getBoolean("doIndex", false) && doc!=null){
+
+ if (conf.getBoolean("doIndex", false) && doc != null) {
IndexWriters writers = new IndexWriters(getConf());
writers.open(new JobConf(getConf()), "IndexingFilterChecker");
writers.write(doc);
writers.close();
}
-
+
return 0;
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java Thu Jan 29 05:38:59 2015
@@ -46,145 +46,143 @@ import org.slf4j.LoggerFactory;
public class IndexingJob extends Configured implements Tool {
- public static Logger LOG = LoggerFactory.getLogger(IndexingJob.class);
+ public static Logger LOG = LoggerFactory.getLogger(IndexingJob.class);
- public IndexingJob() {
- super(null);
- }
-
- public IndexingJob(Configuration conf) {
- super(conf);
- }
-
- public void index(Path crawlDb, Path linkDb, List<Path> segments,
- boolean noCommit) throws IOException {
- index(crawlDb, linkDb, segments, noCommit, false, null);
- }
-
- public void index(Path crawlDb, Path linkDb, List<Path> segments,
- boolean noCommit, boolean deleteGone) throws IOException {
- index(crawlDb, linkDb, segments, noCommit, deleteGone, null);
- }
-
- public void index(Path crawlDb, Path linkDb, List<Path> segments,
- boolean noCommit, boolean deleteGone, String params)
- throws IOException {
- index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
- false);
- }
-
- public void index(Path crawlDb, Path linkDb, List<Path> segments,
- boolean noCommit, boolean deleteGone, String params,
- boolean filter, boolean normalize) throws IOException {
-
- SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- long start = System.currentTimeMillis();
- LOG.info("Indexer: starting at " + sdf.format(start));
-
- final JobConf job = new NutchJob(getConf());
- job.setJobName("Indexer");
-
- LOG.info("Indexer: deleting gone documents: " + deleteGone);
- LOG.info("Indexer: URL filtering: " + filter);
- LOG.info("Indexer: URL normalizing: " + normalize);
-
- IndexWriters writers = new IndexWriters(getConf());
- LOG.info(writers.describe());
-
- IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
-
- // NOW PASSED ON THE COMMAND LINE AS A HADOOP PARAM
- // job.set(SolrConstants.SERVER_URL, solrUrl);
-
- job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
- job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
- job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
-
- if (params != null) {
- job.set(IndexerMapReduce.INDEXER_PARAMS, params);
+ public IndexingJob() {
+ super(null);
+ }
+
+ public IndexingJob(Configuration conf) {
+ super(conf);
+ }
+
+ public void index(Path crawlDb, Path linkDb, List<Path> segments,
+ boolean noCommit) throws IOException {
+ index(crawlDb, linkDb, segments, noCommit, false, null);
+ }
+
+ public void index(Path crawlDb, Path linkDb, List<Path> segments,
+ boolean noCommit, boolean deleteGone) throws IOException {
+ index(crawlDb, linkDb, segments, noCommit, deleteGone, null);
+ }
+
+ public void index(Path crawlDb, Path linkDb, List<Path> segments,
+ boolean noCommit, boolean deleteGone, String params) throws IOException {
+ index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false, false);
+ }
+
+ public void index(Path crawlDb, Path linkDb, List<Path> segments,
+ boolean noCommit, boolean deleteGone, String params, boolean filter,
+ boolean normalize) throws IOException {
+
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("Indexer: starting at " + sdf.format(start));
+
+ final JobConf job = new NutchJob(getConf());
+ job.setJobName("Indexer");
+
+ LOG.info("Indexer: deleting gone documents: " + deleteGone);
+ LOG.info("Indexer: URL filtering: " + filter);
+ LOG.info("Indexer: URL normalizing: " + normalize);
+
+ IndexWriters writers = new IndexWriters(getConf());
+ LOG.info(writers.describe());
+
+ IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
+
+ // NOW PASSED ON THE COMMAND LINE AS A HADOOP PARAM
+ // job.set(SolrConstants.SERVER_URL, solrUrl);
+
+ job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
+ job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
+ job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
+
+ if (params != null) {
+ job.set(IndexerMapReduce.INDEXER_PARAMS, params);
+ }
+
+ job.setReduceSpeculativeExecution(false);
+
+ final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
+ + new Random().nextInt());
+
+ FileOutputFormat.setOutputPath(job, tmp);
+ try {
+ JobClient.runJob(job);
+ // do the commits once and for all the reducers in one go
+ if (!noCommit) {
+ writers.open(job, "commit");
+ writers.commit();
+ }
+ long end = System.currentTimeMillis();
+ LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
+ + TimingUtil.elapsedTime(start, end));
+ } finally {
+ FileSystem.get(job).delete(tmp, true);
+ }
+ }
+
+ public int run(String[] args) throws Exception {
+ if (args.length < 2) {
+ System.err
+ .println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
+ IndexWriters writers = new IndexWriters(getConf());
+ System.err.println(writers.describe());
+ return -1;
+ }
+
+ final Path crawlDb = new Path(args[0]);
+ Path linkDb = null;
+
+ final List<Path> segments = new ArrayList<Path>();
+ String params = null;
+
+ boolean noCommit = false;
+ boolean deleteGone = false;
+ boolean filter = false;
+ boolean normalize = false;
+
+ for (int i = 1; i < args.length; i++) {
+ if (args[i].equals("-linkdb")) {
+ linkDb = new Path(args[++i]);
+ } else if (args[i].equals("-dir")) {
+ Path dir = new Path(args[++i]);
+ FileSystem fs = dir.getFileSystem(getConf());
+ FileStatus[] fstats = fs.listStatus(dir,
+ HadoopFSUtil.getPassDirectoriesFilter(fs));
+ Path[] files = HadoopFSUtil.getPaths(fstats);
+ for (Path p : files) {
+ segments.add(p);
}
-
- job.setReduceSpeculativeExecution(false);
-
- final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
- + new Random().nextInt());
-
- FileOutputFormat.setOutputPath(job, tmp);
- try {
- JobClient.runJob(job);
- // do the commits once and for all the reducers in one go
- if (!noCommit) {
- writers.open(job,"commit");
- writers.commit();
- }
- long end = System.currentTimeMillis();
- LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
- + TimingUtil.elapsedTime(start, end));
- } finally {
- FileSystem.get(job).delete(tmp, true);
- }
- }
-
- public int run(String[] args) throws Exception {
- if (args.length < 2) {
- System.err
- .println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
- IndexWriters writers = new IndexWriters(getConf());
- System.err.println(writers.describe());
- return -1;
- }
-
- final Path crawlDb = new Path(args[0]);
- Path linkDb = null;
-
- final List<Path> segments = new ArrayList<Path>();
- String params = null;
-
- boolean noCommit = false;
- boolean deleteGone = false;
- boolean filter = false;
- boolean normalize = false;
-
- for (int i = 1; i < args.length; i++) {
- if (args[i].equals("-linkdb")) {
- linkDb = new Path(args[++i]);
- } else if (args[i].equals("-dir")) {
- Path dir = new Path(args[++i]);
- FileSystem fs = dir.getFileSystem(getConf());
- FileStatus[] fstats = fs.listStatus(dir,
- HadoopFSUtil.getPassDirectoriesFilter(fs));
- Path[] files = HadoopFSUtil.getPaths(fstats);
- for (Path p : files) {
- segments.add(p);
- }
- } else if (args[i].equals("-noCommit")) {
- noCommit = true;
- } else if (args[i].equals("-deleteGone")) {
- deleteGone = true;
- } else if (args[i].equals("-filter")) {
- filter = true;
- } else if (args[i].equals("-normalize")) {
- normalize = true;
- } else if (args[i].equals("-params")) {
- params = args[++i];
- } else {
- segments.add(new Path(args[i]));
- }
- }
-
- try {
- index(crawlDb, linkDb, segments, noCommit, deleteGone, params,
- filter, normalize);
- return 0;
- } catch (final Exception e) {
- LOG.error("Indexer: " + StringUtils.stringifyException(e));
- return -1;
- }
- }
-
- public static void main(String[] args) throws Exception {
- final int res = ToolRunner.run(NutchConfiguration.create(),
- new IndexingJob(), args);
- System.exit(res);
- }
+ } else if (args[i].equals("-noCommit")) {
+ noCommit = true;
+ } else if (args[i].equals("-deleteGone")) {
+ deleteGone = true;
+ } else if (args[i].equals("-filter")) {
+ filter = true;
+ } else if (args[i].equals("-normalize")) {
+ normalize = true;
+ } else if (args[i].equals("-params")) {
+ params = args[++i];
+ } else {
+ segments.add(new Path(args[i]));
+ }
+ }
+
+ try {
+ index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter,
+ normalize);
+ return 0;
+ } catch (final Exception e) {
+ LOG.error("Indexer: " + StringUtils.stringifyException(e));
+ return -1;
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+ final int res = ToolRunner.run(NutchConfiguration.create(),
+ new IndexingJob(), args);
+ System.exit(res);
+ }
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java Thu Jan 29 05:38:59 2015
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.nutch.metadata.Metadata;
-/** A {@link NutchDocument} is the unit of indexing.*/
-public class NutchDocument
-implements Writable, Iterable<Entry<String, NutchField>> {
+/** A {@link NutchDocument} is the unit of indexing. */
+public class NutchDocument implements Writable,
+ Iterable<Entry<String, NutchField>> {
public static final byte VERSION = 2;
-
+
private Map<String, NutchField> fields;
private Metadata documentMeta;
@@ -127,8 +127,8 @@ implements Writable, Iterable<Entry<Stri
out.writeFloat(weight);
documentMeta.write(out);
}
-
- public String toString() {
+
+ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("doc {\n");
for (Map.Entry<String, NutchField> entry : fields.entrySet()) {
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java Thu Jan 29 05:38:59 2015
@@ -28,32 +28,33 @@ import java.util.List;
import org.apache.hadoop.io.*;
/**
- * This class represents a multi-valued field with a weight.
- * Values are arbitrary objects.
+ * This class represents a multi-valued field with a weight. Values are
+ * arbitrary objects.
*/
public class NutchField implements Writable {
private float weight;
private List<Object> values = new ArrayList<Object>();
-
- public NutchField() { }
-
+
+ public NutchField() {
+ }
+
public NutchField(Object value) {
this(value, 1.0f);
}
-
+
public NutchField(Object value, float weight) {
this.weight = weight;
if (value instanceof Collection) {
- values.addAll((Collection<?>)value);
+ values.addAll((Collection<?>) value);
} else {
values.add(value);
}
}
-
+
public void add(Object value) {
values.add(value);
}
-
+
public float getWeight() {
return weight;
}
@@ -65,7 +66,7 @@ public class NutchField implements Writa
public List<Object> getValues() {
return values;
}
-
+
public void reset() {
weight = 1.0f;
values.clear();
@@ -73,13 +74,13 @@ public class NutchField implements Writa
@Override
public Object clone() throws CloneNotSupportedException {
- NutchField result = (NutchField)super.clone();
+ NutchField result = (NutchField) super.clone();
result.weight = weight;
result.values = values;
return result;
}
-
+
@Override
public void readFields(DataInput in) throws IOException {
weight = in.readFloat();
@@ -87,7 +88,7 @@ public class NutchField implements Writa
values = new ArrayList<Object>();
for (int i = 0; i < count; i++) {
String type = Text.readString(in);
-
+
if (type.equals("java.lang.String")) {
values.add(Text.readString(in));
} else if (type.equals("java.lang.Boolean")) {
@@ -109,26 +110,26 @@ public class NutchField implements Writa
out.writeFloat(weight);
out.writeInt(values.size());
for (Object value : values) {
-
+
Text.writeString(out, value.getClass().getName());
-
+
if (value instanceof Boolean) {
- out.writeBoolean((Boolean)value);
+ out.writeBoolean((Boolean) value);
} else if (value instanceof Integer) {
- out.writeInt((Integer)value);
+ out.writeInt((Integer) value);
} else if (value instanceof Long) {
- out.writeLong((Long)value);
+ out.writeLong((Long) value);
} else if (value instanceof Float) {
- out.writeFloat((Float)value);
+ out.writeFloat((Float) value);
} else if (value instanceof String) {
- Text.writeString(out, (String)value);
+ Text.writeString(out, (String) value);
} else if (value instanceof Date) {
- Date date = (Date)value;
+ Date date = (Date) value;
out.writeLong(date.getTime());
}
}
}
-
+
public String toString() {
return values.toString();
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java Thu Jan 29 05:38:59 2015
@@ -25,8 +25,8 @@ import org.apache.hadoop.io.Writable;
import org.apache.nutch.indexer.NutchDocument;
/**
- * A {@link NutchIndexAction} is the new unit of indexing holding the
- * document and action information.
+ * A {@link NutchIndexAction} is the new unit of indexing holding the document
+ * and action information.
*/
public class NutchIndexAction implements Writable {
Modified: nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java Thu Jan 29 05:38:59 2015
@@ -16,21 +16,20 @@
*/
package org.apache.nutch.metadata;
-
/**
* A collection of Creative Commons properties names.
- *
+ *
* @see <a href="http://www.creativecommons.org/">creativecommons.org</a>
- *
+ *
* @author Chris Mattmann
* @author Jérôme Charron
*/
public interface CreativeCommons {
-
+
public final static String LICENSE_URL = "License-Url";
-
+
public final static String LICENSE_LOCATION = "License-Location";
-
+
public final static String WORK_TYPE = "Work-Type";
-
+
}