You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2015/01/29 06:39:03 UTC

svn commit: r1655526 [5/26] - in /nutch/trunk: ./ src/java/org/apache/nutch/crawl/ src/java/org/apache/nutch/fetcher/ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/metadata/ src/java/org/apache/nutch/net/ src/java/org/apache/nutch/net/pr...

Modified: nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/fetcher/OldFetcher.java Thu Jan 29 05:38:59 2015
@@ -43,28 +43,28 @@ import org.apache.nutch.parse.*;
 import org.apache.nutch.scoring.ScoringFilters;
 import org.apache.nutch.util.*;
 
-
 /** The fetcher. Most of the work is done by plugins. */
-public class OldFetcher extends Configured implements Tool, MapRunnable<WritableComparable<?>, Writable, Text, NutchWritable> { 
+public class OldFetcher extends Configured implements Tool,
+    MapRunnable<WritableComparable<?>, Writable, Text, NutchWritable> {
 
   public static final Logger LOG = LoggerFactory.getLogger(OldFetcher.class);
-  
+
   public static final int PERM_REFRESH_TIME = 5;
 
   public static final String CONTENT_REDIR = "content";
 
   public static final String PROTOCOL_REDIR = "protocol";
 
-  public static class InputFormat extends SequenceFileInputFormat<WritableComparable<?>, Writable> {
+  public static class InputFormat extends
+      SequenceFileInputFormat<WritableComparable<?>, Writable> {
     /** Don't split inputs, to keep things polite. */
-    public InputSplit[] getSplits(JobConf job, int nSplits)
-      throws IOException {
+    public InputSplit[] getSplits(JobConf job, int nSplits) throws IOException {
       FileStatus[] files = listStatus(job);
       InputSplit[] splits = new InputSplit[files.length];
       for (int i = 0; i < files.length; i++) {
         FileStatus cur = files[i];
-        splits[i] = new FileSplit(cur.getPath(), 0,
-            cur.getLen(), (String[])null);
+        splits[i] = new FileSplit(cur.getPath(), 0, cur.getLen(),
+            (String[]) null);
       }
       return splits;
     }
@@ -81,9 +81,9 @@ public class OldFetcher extends Configur
   private long start = System.currentTimeMillis(); // start time of fetcher run
   private long lastRequestStart = start;
 
-  private long bytes;                             // total bytes fetched
-  private int pages;                              // total pages fetched
-  private int errors;                             // total pages errored
+  private long bytes; // total bytes fetched
+  private int pages; // total pages fetched
+  private int errors; // total pages errored
 
   private boolean storingContent;
   private boolean parsing;
@@ -100,8 +100,8 @@ public class OldFetcher extends Configur
     private String reprUrl;
 
     public FetcherThread(Configuration conf) {
-      this.setDaemon(true);                       // don't hang JVM on exit
-      this.setName("FetcherThread");              // use an informative name
+      this.setDaemon(true); // don't hang JVM on exit
+      this.setName("FetcherThread"); // use an informative name
       this.conf = conf;
       this.urlFilters = new URLFilters(conf);
       this.scfilters = new ScoringFilters(conf);
@@ -112,26 +112,28 @@ public class OldFetcher extends Configur
 
     @SuppressWarnings("fallthrough")
     public void run() {
-      synchronized (OldFetcher.this) {activeThreads++;} // count threads
-      
+      synchronized (OldFetcher.this) {
+        activeThreads++;
+      } // count threads
+
       try {
         Text key = new Text();
         CrawlDatum datum = new CrawlDatum();
-        
+
         while (true) {
           // TODO : NUTCH-258 ...
           // If something bad happened, then exit
           // if (conf.getBoolean("fetcher.exit", false)) {
-          //   break;
+          // break;
           // ]
-          
-          try {                                   // get next entry from input
+
+          try { // get next entry from input
             if (!input.next(key, datum)) {
-              break;                              // at eof, exit
+              break; // at eof, exit
             }
           } catch (IOException e) {
             if (LOG.isErrorEnabled()) {
-              LOG.error("fetcher caught:"+e.toString());
+              LOG.error("fetcher caught:" + e.toString());
             }
             break;
           }
@@ -143,8 +145,8 @@ public class OldFetcher extends Configur
           // url may be changed through redirects.
           Text url = new Text(key);
 
-          Text reprUrlWritable =
-            (Text) datum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+          Text reprUrlWritable = (Text) datum.getMetaData().get(
+              Nutch.WRITABLE_REPR_URL_KEY);
           if (reprUrlWritable == null) {
             reprUrl = key.toString();
           } else {
@@ -152,7 +154,9 @@ public class OldFetcher extends Configur
           }
 
           try {
-            if (LOG.isInfoEnabled()) { LOG.info("fetching " + url); }
+            if (LOG.isInfoEnabled()) {
+              LOG.info("fetching " + url);
+            }
 
             // fetch the page
             redirectCount = 0;
@@ -161,7 +165,8 @@ public class OldFetcher extends Configur
                 LOG.debug("redirectCount=" + redirectCount);
               }
               redirecting = false;
-              Protocol protocol = this.protocolFactory.getProtocol(url.toString());
+              Protocol protocol = this.protocolFactory.getProtocol(url
+                  .toString());
               ProtocolOutput output = protocol.getProtocolOutput(url, datum);
               ProtocolStatus status = output.getStatus();
               Content content = output.getContent();
@@ -173,22 +178,22 @@ public class OldFetcher extends Configur
                     new Text(reprUrl));
               }
 
-              switch(status.getCode()) {
+              switch (status.getCode()) {
 
-              case ProtocolStatus.SUCCESS:        // got a page
-                pstatus = output(url, datum, content, status, CrawlDatum.STATUS_FETCH_SUCCESS);
+              case ProtocolStatus.SUCCESS: // got a page
+                pstatus = output(url, datum, content, status,
+                    CrawlDatum.STATUS_FETCH_SUCCESS);
                 updateStatus(content.getContent().length);
-                if (pstatus != null && pstatus.isSuccess() &&
-                        pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
+                if (pstatus != null && pstatus.isSuccess()
+                    && pstatus.getMinorCode() == ParseStatus.SUCCESS_REDIRECT) {
                   String newUrl = pstatus.getMessage();
                   int refreshTime = Integer.valueOf(pstatus.getArgs()[1]);
                   url = handleRedirect(url, datum, urlString, newUrl,
-                                       refreshTime < PERM_REFRESH_TIME,
-                                       CONTENT_REDIR);
+                      refreshTime < PERM_REFRESH_TIME, CONTENT_REDIR);
                 }
                 break;
 
-              case ProtocolStatus.MOVED:         // redirect
+              case ProtocolStatus.MOVED: // redirect
               case ProtocolStatus.TEMP_MOVED:
                 int code;
                 boolean temp;
@@ -201,22 +206,22 @@ public class OldFetcher extends Configur
                 }
                 output(url, datum, content, status, code);
                 String newUrl = status.getMessage();
-                url = handleRedirect(url, datum, urlString, newUrl,
-                                     temp, PROTOCOL_REDIR);
+                url = handleRedirect(url, datum, urlString, newUrl, temp,
+                    PROTOCOL_REDIR);
                 break;
 
               // failures - increase the retry counter
               case ProtocolStatus.EXCEPTION:
                 logError(url, status.getMessage());
-              /* FALLTHROUGH */
-              case ProtocolStatus.RETRY:          // retry
+                /* FALLTHROUGH */
+              case ProtocolStatus.RETRY: // retry
               case ProtocolStatus.WOULDBLOCK:
               case ProtocolStatus.BLOCKED:
                 output(url, datum, null, status, CrawlDatum.STATUS_FETCH_RETRY);
                 break;
-                
+
               // permanent failures
-              case ProtocolStatus.GONE:           // gone
+              case ProtocolStatus.GONE: // gone
               case ProtocolStatus.NOTFOUND:
               case ProtocolStatus.ACCESS_DENIED:
               case ProtocolStatus.ROBOTS_DENIED:
@@ -224,9 +229,10 @@ public class OldFetcher extends Configur
                 break;
 
               case ProtocolStatus.NOTMODIFIED:
-                output(url, datum, null, status, CrawlDatum.STATUS_FETCH_NOTMODIFIED);
+                output(url, datum, null, status,
+                    CrawlDatum.STATUS_FETCH_NOTMODIFIED);
                 break;
-                
+
               default:
                 if (LOG.isWarnEnabled()) {
                   LOG.warn("Unknown ProtocolStatus: " + status.getCode());
@@ -243,27 +249,27 @@ public class OldFetcher extends Configur
 
             } while (redirecting && (redirectCount < maxRedirect));
 
-            
-          } catch (Throwable t) {                 // unexpected exception
+          } catch (Throwable t) { // unexpected exception
             logError(url, t.toString());
             output(url, datum, null, null, CrawlDatum.STATUS_FETCH_RETRY);
-            
+
           }
         }
 
       } catch (Throwable e) {
         if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:"+e.toString());
+          LOG.error("fetcher caught:" + e.toString());
         }
       } finally {
-        synchronized (OldFetcher.this) {activeThreads--;} // count threads
+        synchronized (OldFetcher.this) {
+          activeThreads--;
+        } // count threads
       }
     }
 
-    private Text handleRedirect(Text url, CrawlDatum datum,
-                                String urlString, String newUrl,
-                                boolean temp, String redirType)
-    throws MalformedURLException, URLFilterException {
+    private Text handleRedirect(Text url, CrawlDatum datum, String urlString,
+        String newUrl, boolean temp, String redirType)
+        throws MalformedURLException, URLFilterException {
       newUrl = normalizers.normalize(newUrl, URLNormalizers.SCOPE_FETCHER);
       newUrl = urlFilters.filter(newUrl);
       if (newUrl != null && !newUrl.equals(urlString)) {
@@ -273,8 +279,8 @@ public class OldFetcher extends Configur
           redirecting = true;
           redirectCount++;
           if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                      url + " (fetching now)");
+            LOG.debug(" - " + redirType + " redirect to " + url
+                + " (fetching now)");
           }
           return url;
         } else {
@@ -285,15 +291,15 @@ public class OldFetcher extends Configur
           }
           output(url, newDatum, null, null, CrawlDatum.STATUS_LINKED);
           if (LOG.isDebugEnabled()) {
-            LOG.debug(" - " + redirType + " redirect to " +
-                      url + " (fetching later)");
+            LOG.debug(" - " + redirType + " redirect to " + url
+                + " (fetching later)");
           }
           return null;
         }
       } else {
         if (LOG.isDebugEnabled()) {
-          LOG.debug(" - " + redirType + " redirect skipped: " +
-              (newUrl != null ? "to same url" : "filtered"));
+          LOG.debug(" - " + redirType + " redirect skipped: "
+              + (newUrl != null ? "to same url" : "filtered"));
         }
         return null;
       }
@@ -303,17 +309,18 @@ public class OldFetcher extends Configur
       if (LOG.isInfoEnabled()) {
         LOG.info("fetch of " + url + " failed with: " + message);
       }
-      synchronized (OldFetcher.this) {               // record failure
+      synchronized (OldFetcher.this) { // record failure
         errors++;
       }
     }
 
-    private ParseStatus output(Text key, CrawlDatum datum,
-                        Content content, ProtocolStatus pstatus, int status) {
+    private ParseStatus output(Text key, CrawlDatum datum, Content content,
+        ProtocolStatus pstatus, int status) {
 
       datum.setStatus(status);
       datum.setFetchTime(System.currentTimeMillis());
-      if (pstatus != null) datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
+      if (pstatus != null)
+        datum.getMetaData().put(Nutch.WRITABLE_PROTO_STATUS_KEY, pstatus);
 
       ParseResult parseResult = null;
       if (content != null) {
@@ -328,27 +335,31 @@ public class OldFetcher extends Configur
             LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
           }
         }
-        /* Note: Fetcher will only follow meta-redirects coming from the
-         * original URL. */ 
+        /*
+         * Note: Fetcher will only follow meta-redirects coming from the
+         * original URL.
+         */
         if (parsing && status == CrawlDatum.STATUS_FETCH_SUCCESS) {
           try {
             parseResult = this.parseUtil.parse(content);
           } catch (Exception e) {
-            LOG.warn("Error parsing: " + key + ": " + StringUtils.stringifyException(e));
+            LOG.warn("Error parsing: " + key + ": "
+                + StringUtils.stringifyException(e));
           }
 
           if (parseResult == null) {
-            byte[] signature = 
-              SignatureFactory.getSignature(getConf()).calculate(content, 
-                  new ParseStatus().getEmptyParse(conf));
+            byte[] signature = SignatureFactory.getSignature(getConf())
+                .calculate(content, new ParseStatus().getEmptyParse(conf));
             datum.setSignature(signature);
           }
         }
-        
-        /* Store status code in content So we can read this value during 
-         * parsing (as a separate job) and decide to parse or not.
+
+        /*
+         * Store status code in content So we can read this value during parsing
+         * (as a separate job) and decide to parse or not.
          */
-        content.getMetadata().add(Nutch.FETCH_STATUS_KEY, Integer.toString(status));
+        content.getMetadata().add(Nutch.FETCH_STATUS_KEY,
+            Integer.toString(status));
       }
 
       try {
@@ -360,7 +371,7 @@ public class OldFetcher extends Configur
             Text url = entry.getKey();
             Parse parse = entry.getValue();
             ParseStatus parseStatus = parse.getData().getStatus();
-            
+
             if (!parseStatus.isSuccess()) {
               LOG.warn("Error parsing: " + key + ": " + parseStatus);
               parse = parseStatus.getEmptyParse(getConf());
@@ -368,16 +379,16 @@ public class OldFetcher extends Configur
 
             // Calculate page signature. For non-parsing fetchers this will
             // be done in ParseSegment
-            byte[] signature = 
-              SignatureFactory.getSignature(getConf()).calculate(content, parse);
+            byte[] signature = SignatureFactory.getSignature(getConf())
+                .calculate(content, parse);
             // Ensure segment name and score are in parseData metadata
-            parse.getData().getContentMeta().set(Nutch.SEGMENT_NAME_KEY, 
-                segmentName);
-            parse.getData().getContentMeta().set(Nutch.SIGNATURE_KEY, 
-                StringUtil.toHexString(signature));
+            parse.getData().getContentMeta()
+                .set(Nutch.SEGMENT_NAME_KEY, segmentName);
+            parse.getData().getContentMeta()
+                .set(Nutch.SIGNATURE_KEY, StringUtil.toHexString(signature));
             // Pass fetch time to content meta
-            parse.getData().getContentMeta().set(Nutch.FETCH_TIME_KEY,
-                Long.toString(datum.getFetchTime()));
+            parse.getData().getContentMeta()
+                .set(Nutch.FETCH_TIME_KEY, Long.toString(datum.getFetchTime()));
             if (url.equals(key))
               datum.setSignature(signature);
             try {
@@ -387,14 +398,13 @@ public class OldFetcher extends Configur
                 LOG.warn("Couldn't pass score, url " + key + " (" + e + ")");
               }
             }
-            output.collect(url, new NutchWritable(
-                    new ParseImpl(new ParseText(parse.getText()), 
-                                  parse.getData(), parse.isCanonical())));
+            output.collect(url, new NutchWritable(new ParseImpl(new ParseText(
+                parse.getText()), parse.getData(), parse.isCanonical())));
           }
         }
       } catch (IOException e) {
         if (LOG.isErrorEnabled()) {
-          LOG.error("fetcher caught:"+e.toString());
+          LOG.error("fetcher caught:" + e.toString());
         }
       }
 
@@ -404,10 +414,10 @@ public class OldFetcher extends Configur
         if (p != null) {
           return p.getData().getStatus();
         }
-      } 
+      }
       return null;
     }
-    
+
   }
 
   private synchronized void updateStatus(int bytesInPage) throws IOException {
@@ -418,23 +428,22 @@ public class OldFetcher extends Configur
   private void reportStatus() throws IOException {
     String status;
     synchronized (this) {
-      long elapsed = (System.currentTimeMillis() - start)/1000;
-      status = 
-        pages+" pages, "+errors+" errors, "
-        + Math.round(((float)pages*10)/elapsed)/10.0+" pages/s, "
-        + Math.round(((((float)bytes)*8)/1024)/elapsed)+" kb/s, ";
+      long elapsed = (System.currentTimeMillis() - start) / 1000;
+      status = pages + " pages, " + errors + " errors, "
+          + Math.round(((float) pages * 10) / elapsed) / 10.0 + " pages/s, "
+          + Math.round(((((float) bytes) * 8) / 1024) / elapsed) + " kb/s, ";
     }
     reporter.setStatus(status);
   }
 
   public OldFetcher() {
-    
+
   }
-  
+
   public OldFetcher(Configuration conf) {
     setConf(conf);
   }
-  
+
   public void configure(JobConf job) {
     setConf(job);
 
@@ -442,12 +451,13 @@ public class OldFetcher extends Configur
     this.storingContent = isStoringContent(job);
     this.parsing = isParsing(job);
 
-//    if (job.getBoolean("fetcher.verbose", false)) {
-//      LOG.setLevel(Level.FINE);
-//    }
+    // if (job.getBoolean("fetcher.verbose", false)) {
+    // LOG.setLevel(Level.FINE);
+    // }
   }
 
-  public void close() {}
+  public void close() {
+  }
 
   public static boolean isParsing(Configuration conf) {
     return conf.getBoolean("fetcher.parse", true);
@@ -457,29 +467,33 @@ public class OldFetcher extends Configur
     return conf.getBoolean("fetcher.store.content", true);
   }
 
-  public void run(RecordReader<WritableComparable<?>, Writable> input, OutputCollector<Text, NutchWritable> output,
-                  Reporter reporter) throws IOException {
+  public void run(RecordReader<WritableComparable<?>, Writable> input,
+      OutputCollector<Text, NutchWritable> output, Reporter reporter)
+      throws IOException {
 
     this.input = input;
     this.output = output;
     this.reporter = reporter;
 
     this.maxRedirect = getConf().getInt("http.redirect.max", 3);
-    
+
     int threadCount = getConf().getInt("fetcher.threads.fetch", 10);
-    if (LOG.isInfoEnabled()) { LOG.info("OldFetcher: threads: " + threadCount); }
+    if (LOG.isInfoEnabled()) {
+      LOG.info("OldFetcher: threads: " + threadCount);
+    }
 
-    for (int i = 0; i < threadCount; i++) {       // spawn threads
+    for (int i = 0; i < threadCount; i++) { // spawn threads
       new FetcherThread(getConf()).start();
     }
 
     // select a timeout that avoids a task timeout
-    long timeout = getConf().getInt("mapred.task.timeout", 10*60*1000)/2;
+    long timeout = getConf().getInt("mapred.task.timeout", 10 * 60 * 1000) / 2;
 
-    do {                                          // wait for threads to exit
+    do { // wait for threads to exit
       try {
         Thread.sleep(1000);
-      } catch (InterruptedException e) {}
+      } catch (InterruptedException e) {
+      }
 
       reportStatus();
 
@@ -487,18 +501,17 @@ public class OldFetcher extends Configur
       synchronized (this) {
         if ((System.currentTimeMillis() - lastRequestStart) > timeout) {
           if (LOG.isWarnEnabled()) {
-            LOG.warn("Aborting with "+activeThreads+" hung threads.");
+            LOG.warn("Aborting with " + activeThreads + " hung threads.");
           }
           return;
         }
       }
 
     } while (activeThreads > 0);
-    
+
   }
 
-  public void fetch(Path segment, int threads)
-    throws IOException {
+  public void fetch(Path segment, int threads) throws IOException {
 
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
@@ -516,7 +529,8 @@ public class OldFetcher extends Configur
     // for politeness, don't permit parallel execution of a single task
     job.setSpeculativeExecution(false);
 
-    FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.GENERATE_DIR_NAME));
+    FileInputFormat.addInputPath(job, new Path(segment,
+        CrawlDatum.GENERATE_DIR_NAME));
     job.setInputFormat(InputFormat.class);
 
     job.setMapRunnerClass(OldFetcher.class);
@@ -528,16 +542,17 @@ public class OldFetcher extends Configur
 
     JobClient.runJob(job);
     long end = System.currentTimeMillis();
-    LOG.info("OldFetcher: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
+    LOG.info("OldFetcher: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
   }
 
-
   /** Run the fetcher. */
   public static void main(String[] args) throws Exception {
-    int res = ToolRunner.run(NutchConfiguration.create(), new OldFetcher(), args);
+    int res = ToolRunner.run(NutchConfiguration.create(), new OldFetcher(),
+        args);
     System.exit(res);
   }
-  
+
   public int run(String[] args) throws Exception {
 
     String usage = "Usage: OldFetcher <segment> [-threads n] [-noParsing]";
@@ -546,15 +561,16 @@ public class OldFetcher extends Configur
       System.err.println(usage);
       return -1;
     }
-      
+
     Path segment = new Path(args[0]);
     int threads = getConf().getInt("fetcher.threads.fetch", 10);
     boolean parsing = true;
 
-    for (int i = 1; i < args.length; i++) {       // parse command line
-      if (args[i].equals("-threads")) {           // found -threads option
-        threads =  Integer.parseInt(args[++i]);
-      } else if (args[i].equals("-noParsing")) parsing = false;
+    for (int i = 1; i < args.length; i++) { // parse command line
+      if (args[i].equals("-threads")) { // found -threads option
+        threads = Integer.parseInt(args[++i]);
+      } else if (args[i].equals("-noParsing"))
+        parsing = false;
     }
 
     getConf().setInt("fetcher.threads.fetch", threads);
@@ -562,7 +578,7 @@ public class OldFetcher extends Configur
       getConf().setBoolean("fetcher.parse", parsing);
     }
     try {
-      fetch(segment, threads);              // run the Fetcher
+      fetch(segment, threads); // run the Fetcher
       return 0;
     } catch (Exception e) {
       LOG.error("OldFetcher: " + StringUtils.stringifyException(e));

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/CleaningJob.java Thu Jan 29 05:38:59 2015
@@ -45,169 +45,166 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * The class scans CrawlDB looking for entries with status DB_GONE (404) or 
- * DB_DUPLICATE and
- * sends delete requests to indexers for those documents.
+ * The class scans CrawlDB looking for entries with status DB_GONE (404) or
+ * DB_DUPLICATE and sends delete requests to indexers for those documents.
  */
 
 public class CleaningJob implements Tool {
-    public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
-    private Configuration conf;
+  public static final Logger LOG = LoggerFactory.getLogger(CleaningJob.class);
+  private Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    this.conf = conf;
+  }
+
+  public static class DBFilter implements
+      Mapper<Text, CrawlDatum, ByteWritable, Text> {
+    private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
+
+    @Override
+    public void configure(JobConf arg0) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
 
     @Override
-    public Configuration getConf() {
-        return conf;
+    public void map(Text key, CrawlDatum value,
+        OutputCollector<ByteWritable, Text> output, Reporter reporter)
+        throws IOException {
+
+      if (value.getStatus() == CrawlDatum.STATUS_DB_GONE
+          || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
+        output.collect(OUT, key);
+      }
+    }
+  }
+
+  public static class DeleterReducer implements
+      Reducer<ByteWritable, Text, Text, ByteWritable> {
+    private static final int NUM_MAX_DELETE_REQUEST = 1000;
+    private int numDeletes = 0;
+    private int totalDeleted = 0;
+
+    private boolean noCommit = false;
+
+    IndexWriters writers = null;
+
+    @Override
+    public void configure(JobConf job) {
+      writers = new IndexWriters(job);
+      try {
+        writers.open(job, "Deletion");
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      noCommit = job.getBoolean("noCommit", false);
     }
 
     @Override
-    public void setConf(Configuration conf) {
-        this.conf = conf;
-    }
-
-    public static class DBFilter implements
-            Mapper<Text, CrawlDatum, ByteWritable, Text> {
-        private ByteWritable OUT = new ByteWritable(CrawlDatum.STATUS_DB_GONE);
-
-        @Override
-        public void configure(JobConf arg0) {
-        }
-
-        @Override
-        public void close() throws IOException {
-        }
-
-        @Override
-        public void map(Text key, CrawlDatum value,
-                OutputCollector<ByteWritable, Text> output, Reporter reporter)
-                throws IOException {
-
-            if (value.getStatus() == CrawlDatum.STATUS_DB_GONE || value.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
-                output.collect(OUT, key);
-            }
-        }
-    }
-
-    public static class DeleterReducer implements
-            Reducer<ByteWritable, Text, Text, ByteWritable> {
-        private static final int NUM_MAX_DELETE_REQUEST = 1000;
-        private int numDeletes = 0;
-        private int totalDeleted = 0;
-
-        private boolean noCommit = false;
-
-        IndexWriters writers = null;
-
-        @Override
-        public void configure(JobConf job) {
-            writers = new IndexWriters(job);
-            try {
-                writers.open(job, "Deletion");
-            } catch (IOException e) {
-                throw new RuntimeException(e);
-            }
-            noCommit = job.getBoolean("noCommit", false);
-        }
-
-        @Override
-        public void close() throws IOException {
-            // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
-            // if (numDeletes > 0) {
-            // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
-            // // TODO updateRequest.process(solr);
-            // totalDeleted += numDeletes;
-            // }
-
-            writers.close();
-
-            if (totalDeleted > 0 && !noCommit) {
-                writers.commit();
-            }
-
-            LOG.info("CleaningJob: deleted a total of " + totalDeleted
-                    + " documents");
-        }
-
-        @Override
-        public void reduce(ByteWritable key, Iterator<Text> values,
-                OutputCollector<Text, ByteWritable> output, Reporter reporter)
-                throws IOException {
-            while (values.hasNext()) {
-                Text document = values.next();
-                writers.delete(document.toString());
-                totalDeleted++;
-                reporter.incrCounter("CleaningJobStatus", "Deleted documents",
-                        1);
-                // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
-                // LOG.info("CleaningJob: deleting " + numDeletes
-                // + " documents");
-                // // TODO updateRequest.process(solr);
-                // // TODO updateRequest = new UpdateRequest();
-                // writers.delete(key.toString());
-                // totalDeleted += numDeletes;
-                // numDeletes = 0;
-                // }
-            }
-        }
-    }
-
-    public void delete(String crawldb, boolean noCommit) throws IOException {
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        long start = System.currentTimeMillis();
-        LOG.info("CleaningJob: starting at " + sdf.format(start));
-
-        JobConf job = new NutchJob(getConf());
-
-        FileInputFormat.addInputPath(job, new Path(crawldb,
-                CrawlDb.CURRENT_NAME));
-        job.setBoolean("noCommit", noCommit);
-        job.setInputFormat(SequenceFileInputFormat.class);
-        job.setOutputFormat(NullOutputFormat.class);
-        job.setMapOutputKeyClass(ByteWritable.class);
-        job.setMapOutputValueClass(Text.class);
-        job.setMapperClass(DBFilter.class);
-        job.setReducerClass(DeleterReducer.class);
-        
-        job.setJobName("CleaningJob");
-
-        // need to expicitely allow deletions
-        job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
-
-        JobClient.runJob(job);
-
-        long end = System.currentTimeMillis();
-        LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
-                + TimingUtil.elapsedTime(start, end));
-    }
-
-    public int run(String[] args) throws IOException {
-        if (args.length < 1) {
-            String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
-            LOG.error("Missing crawldb. "+usage);
-            System.err.println(usage);
-            IndexWriters writers = new IndexWriters(getConf());
-            System.err.println(writers.describe());
-            return 1;
-        }
-
-        boolean noCommit = false;
-        if (args.length == 2 && args[1].equals("-noCommit")) {
-            noCommit = true;
-        }
-
-        try {
-            delete(args[0], noCommit);
-        } catch (final Exception e) {
-            LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
-            System.err.println("ERROR CleaningJob: "
-                    + StringUtils.stringifyException(e));
-            return -1;
-        }
-        return 0;
-    }
-
-    public static void main(String[] args) throws Exception {
-        int result = ToolRunner.run(NutchConfiguration.create(),
-                new CleaningJob(), args);
-        System.exit(result);
+    public void close() throws IOException {
+      // BUFFERING OF CALLS TO INDEXER SHOULD BE HANDLED AT INDEXER LEVEL
+      // if (numDeletes > 0) {
+      // LOG.info("CleaningJob: deleting " + numDeletes + " documents");
+      // // TODO updateRequest.process(solr);
+      // totalDeleted += numDeletes;
+      // }
+
+      writers.close();
+
+      if (totalDeleted > 0 && !noCommit) {
+        writers.commit();
+      }
+
+      LOG.info("CleaningJob: deleted a total of " + totalDeleted + " documents");
     }
+
+    @Override
+    public void reduce(ByteWritable key, Iterator<Text> values,
+        OutputCollector<Text, ByteWritable> output, Reporter reporter)
+        throws IOException {
+      while (values.hasNext()) {
+        Text document = values.next();
+        writers.delete(document.toString());
+        totalDeleted++;
+        reporter.incrCounter("CleaningJobStatus", "Deleted documents", 1);
+        // if (numDeletes >= NUM_MAX_DELETE_REQUEST) {
+        // LOG.info("CleaningJob: deleting " + numDeletes
+        // + " documents");
+        // // TODO updateRequest.process(solr);
+        // // TODO updateRequest = new UpdateRequest();
+        // writers.delete(key.toString());
+        // totalDeleted += numDeletes;
+        // numDeletes = 0;
+        // }
+      }
+    }
+  }
+
+  public void delete(String crawldb, boolean noCommit) throws IOException {
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("CleaningJob: starting at " + sdf.format(start));
+
+    JobConf job = new NutchJob(getConf());
+
+    FileInputFormat.addInputPath(job, new Path(crawldb, CrawlDb.CURRENT_NAME));
+    job.setBoolean("noCommit", noCommit);
+    job.setInputFormat(SequenceFileInputFormat.class);
+    job.setOutputFormat(NullOutputFormat.class);
+    job.setMapOutputKeyClass(ByteWritable.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setMapperClass(DBFilter.class);
+    job.setReducerClass(DeleterReducer.class);
+
+    job.setJobName("CleaningJob");
+
+    // need to expicitely allow deletions
+    job.setBoolean(IndexerMapReduce.INDEXER_DELETE, true);
+
+    JobClient.runJob(job);
+
+    long end = System.currentTimeMillis();
+    LOG.info("CleaningJob: finished at " + sdf.format(end) + ", elapsed: "
+        + TimingUtil.elapsedTime(start, end));
+  }
+
+  public int run(String[] args) throws IOException {
+    if (args.length < 1) {
+      String usage = "Usage: CleaningJob <crawldb> [-noCommit]";
+      LOG.error("Missing crawldb. " + usage);
+      System.err.println(usage);
+      IndexWriters writers = new IndexWriters(getConf());
+      System.err.println(writers.describe());
+      return 1;
+    }
+
+    boolean noCommit = false;
+    if (args.length == 2 && args[1].equals("-noCommit")) {
+      noCommit = true;
+    }
+
+    try {
+      delete(args[0], noCommit);
+    } catch (final Exception e) {
+      LOG.error("CleaningJob: " + StringUtils.stringifyException(e));
+      System.err.println("ERROR CleaningJob: "
+          + StringUtils.stringifyException(e));
+      return -1;
+    }
+    return 0;
+  }
+
+  public static void main(String[] args) throws Exception {
+    int result = ToolRunner.run(NutchConfiguration.create(), new CleaningJob(),
+        args);
+    System.exit(result);
+  }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriter.java Thu Jan 29 05:38:59 2015
@@ -24,22 +24,24 @@ import org.apache.nutch.indexer.NutchDoc
 import org.apache.nutch.plugin.Pluggable;
 
 public interface IndexWriter extends Pluggable, Configurable {
-    /** The name of the extension point. */
-    final static String X_POINT_ID = IndexWriter.class.getName();
-    
-    public void open(JobConf job, String name) throws IOException;
-
-    public void write(NutchDocument doc) throws IOException;
-
-    public void delete(String key) throws IOException;
-    
-    public void update(NutchDocument doc) throws IOException;
-
-    public void commit() throws IOException;
-    
-    public void close() throws IOException;
+  /** The name of the extension point. */
+  final static String X_POINT_ID = IndexWriter.class.getName();
 
-    /** Returns a String describing the IndexWriter instance and the specific parameters it can take */
-	public String describe();
-}
+  public void open(JobConf job, String name) throws IOException;
+
+  public void write(NutchDocument doc) throws IOException;
+
+  public void delete(String key) throws IOException;
+
+  public void update(NutchDocument doc) throws IOException;
 
+  public void commit() throws IOException;
+
+  public void close() throws IOException;
+
+  /**
+   * Returns a String describing the IndexWriter instance and the specific
+   * parameters it can take
+   */
+  public String describe();
+}

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexWriters.java Thu Jan 29 05:38:59 2015
@@ -33,116 +33,113 @@ import org.slf4j.LoggerFactory;
 /** Creates and caches {@link IndexWriter} implementing plugins. */
 public class IndexWriters {
 
-	public final static Logger LOG = LoggerFactory
-			.getLogger(IndexWriters.class);
+  public final static Logger LOG = LoggerFactory.getLogger(IndexWriters.class);
 
-	private IndexWriter[] indexWriters;
+  private IndexWriter[] indexWriters;
 
-	public IndexWriters(Configuration conf) {
-		ObjectCache objectCache = ObjectCache.get(conf);
-		synchronized (objectCache) {
-			this.indexWriters = (IndexWriter[]) objectCache
-					.getObject(IndexWriter.class.getName());
-			if (this.indexWriters == null) {
-				try {
-					ExtensionPoint point = PluginRepository.get(conf)
-							.getExtensionPoint(IndexWriter.X_POINT_ID);
-					if (point == null)
-						throw new RuntimeException(IndexWriter.X_POINT_ID
-								+ " not found.");
-					Extension[] extensions = point.getExtensions();
-					HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
-					for (int i = 0; i < extensions.length; i++) {
-						Extension extension = extensions[i];
-						IndexWriter writer = (IndexWriter) extension
-								.getExtensionInstance();
-						LOG.info("Adding " + writer.getClass().getName());
-						if (!indexerMap.containsKey(writer.getClass().getName())) {
-							indexerMap.put(writer.getClass().getName(), writer);
-						}
-					}
-					objectCache.setObject(IndexWriter.class.getName(), indexerMap
-							.values().toArray(new IndexWriter[0]));
-				} catch (PluginRuntimeException e) {
-					throw new RuntimeException(e);
-				}
-				this.indexWriters = (IndexWriter[]) objectCache
-						.getObject(IndexWriter.class.getName());
-			}
-		}
-	}
-
-	public void open(JobConf job, String name) throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].open(job, name);
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	public void write(NutchDocument doc) throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].write(doc);
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	public void update(NutchDocument doc) throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].update(doc);
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	public void delete(String key) throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].delete(key);
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	public void close() throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].close();
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	public void commit() throws IOException {
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			try {
-				this.indexWriters[i].commit();
-			} catch (IOException ioe) {
-				throw ioe;
-			}
-		}
-	}
-
-	// lists the active IndexWriters and their configuration
-	public String describe() throws IOException {
-		StringBuffer buffer = new StringBuffer();
-		if (this.indexWriters.length == 0)
-			buffer.append("No IndexWriters activated - check your configuration\n");
-		else
-			buffer.append("Active IndexWriters :\n");
-		for (int i = 0; i < this.indexWriters.length; i++) {
-			buffer.append(this.indexWriters[i].describe()).append("\n");
-		}
-		return buffer.toString();
-	}
+  public IndexWriters(Configuration conf) {
+    ObjectCache objectCache = ObjectCache.get(conf);
+    synchronized (objectCache) {
+      this.indexWriters = (IndexWriter[]) objectCache
+          .getObject(IndexWriter.class.getName());
+      if (this.indexWriters == null) {
+        try {
+          ExtensionPoint point = PluginRepository.get(conf).getExtensionPoint(
+              IndexWriter.X_POINT_ID);
+          if (point == null)
+            throw new RuntimeException(IndexWriter.X_POINT_ID + " not found.");
+          Extension[] extensions = point.getExtensions();
+          HashMap<String, IndexWriter> indexerMap = new HashMap<String, IndexWriter>();
+          for (int i = 0; i < extensions.length; i++) {
+            Extension extension = extensions[i];
+            IndexWriter writer = (IndexWriter) extension.getExtensionInstance();
+            LOG.info("Adding " + writer.getClass().getName());
+            if (!indexerMap.containsKey(writer.getClass().getName())) {
+              indexerMap.put(writer.getClass().getName(), writer);
+            }
+          }
+          objectCache.setObject(IndexWriter.class.getName(), indexerMap
+              .values().toArray(new IndexWriter[0]));
+        } catch (PluginRuntimeException e) {
+          throw new RuntimeException(e);
+        }
+        this.indexWriters = (IndexWriter[]) objectCache
+            .getObject(IndexWriter.class.getName());
+      }
+    }
+  }
+
+  public void open(JobConf job, String name) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].open(job, name);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void write(NutchDocument doc) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].write(doc);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void update(NutchDocument doc) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].update(doc);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void delete(String key) throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].delete(key);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void close() throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].close();
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  public void commit() throws IOException {
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      try {
+        this.indexWriters[i].commit();
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+    }
+  }
+
+  // lists the active IndexWriters and their configuration
+  public String describe() throws IOException {
+    StringBuffer buffer = new StringBuffer();
+    if (this.indexWriters.length == 0)
+      buffer.append("No IndexWriters activated - check your configuration\n");
+    else
+      buffer.append("Active IndexWriters :\n");
+    for (int i = 0; i < this.indexWriters.length; i++) {
+      buffer.append(this.indexWriters[i].describe()).append("\n");
+    }
+    return buffer.toString();
+  }
 
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java Thu Jan 29 05:38:59 2015
@@ -50,11 +50,12 @@ import org.apache.nutch.parse.ParseText;
 import org.apache.nutch.scoring.ScoringFilterException;
 import org.apache.nutch.scoring.ScoringFilters;
 
-public class IndexerMapReduce extends Configured
-implements Mapper<Text, Writable, Text, NutchWritable>,
-          Reducer<Text, NutchWritable, Text, NutchIndexAction> {
+public class IndexerMapReduce extends Configured implements
+    Mapper<Text, Writable, Text, NutchWritable>,
+    Reducer<Text, NutchWritable, Text, NutchIndexAction> {
 
-  public static final Logger LOG = LoggerFactory.getLogger(IndexerMapReduce.class);
+  public static final Logger LOG = LoggerFactory
+      .getLogger(IndexerMapReduce.class);
 
   public static final String INDEXER_PARAMS = "indexer.additional.params";
   public static final String INDEXER_DELETE = "indexer.delete";
@@ -82,14 +83,16 @@ implements Mapper<Text, Writable, Text,
     this.filters = new IndexingFilters(getConf());
     this.scfilters = new ScoringFilters(getConf());
     this.delete = job.getBoolean(INDEXER_DELETE, false);
-    this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX, false);
+    this.deleteRobotsNoIndex = job.getBoolean(INDEXER_DELETE_ROBOTS_NOINDEX,
+        false);
     this.skip = job.getBoolean(INDEXER_SKIP_NOTMODIFIED, false);
 
     normalize = job.getBoolean(URL_NORMALIZING, false);
     filter = job.getBoolean(URL_FILTERING, false);
 
     if (normalize) {
-      urlNormalizers = new URLNormalizers(getConf(), URLNormalizers.SCOPE_INDEXER);
+      urlNormalizers = new URLNormalizers(getConf(),
+          URLNormalizers.SCOPE_INDEXER);
     }
 
     if (filter) {
@@ -99,9 +102,10 @@ implements Mapper<Text, Writable, Text,
 
   /**
    * Normalizes and trims extra whitespace from the given url.
-   *
-   * @param url The url to normalize.
-   *
+   * 
+   * @param url
+   *          The url to normalize.
+   * 
    * @return The normalized url.
    */
   private String normalizeUrl(String url) {
@@ -114,11 +118,10 @@ implements Mapper<Text, Writable, Text,
       try {
 
         // normalize and trim the url
-        normalized = urlNormalizers.normalize(url,
-          URLNormalizers.SCOPE_INDEXER);
+        normalized = urlNormalizers
+            .normalize(url, URLNormalizers.SCOPE_INDEXER);
         normalized = normalized.trim();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         LOG.warn("Skipping " + url + ":" + e);
         normalized = null;
       }
@@ -129,9 +132,10 @@ implements Mapper<Text, Writable, Text,
 
   /**
    * Filters the given url.
-   *
-   * @param url The url to filter.
-   *
+   * 
+   * @param url
+   *          The url to filter.
+   * 
    * @return The filtered url or null.
    */
   private String filterUrl(String url) {
@@ -149,7 +153,8 @@ implements Mapper<Text, Writable, Text,
   }
 
   public void map(Text key, Writable value,
-      OutputCollector<Text, NutchWritable> output, Reporter reporter) throws IOException {
+      OutputCollector<Text, NutchWritable> output, Reporter reporter)
+      throws IOException {
 
     String urlString = filterUrl(normalizeUrl(key.toString()));
     if (urlString == null) {
@@ -162,8 +167,8 @@ implements Mapper<Text, Writable, Text,
   }
 
   public void reduce(Text key, Iterator<NutchWritable> values,
-                     OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
-    throws IOException {
+      OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
+      throws IOException {
     Inlinks inlinks = null;
     CrawlDatum dbDatum = null;
     CrawlDatum fetchDatum = null;
@@ -173,26 +178,25 @@ implements Mapper<Text, Writable, Text,
     while (values.hasNext()) {
       final Writable value = values.next().get(); // unwrap
       if (value instanceof Inlinks) {
-        inlinks = (Inlinks)value;
+        inlinks = (Inlinks) value;
       } else if (value instanceof CrawlDatum) {
-        final CrawlDatum datum = (CrawlDatum)value;
+        final CrawlDatum datum = (CrawlDatum) value;
         if (CrawlDatum.hasDbStatus(datum)) {
           dbDatum = datum;
-        }
-        else if (CrawlDatum.hasFetchStatus(datum)) {
+        } else if (CrawlDatum.hasFetchStatus(datum)) {
           // don't index unmodified (empty) pages
           if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
             fetchDatum = datum;
           }
-        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
-                   CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
-                   CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
+        } else if (CrawlDatum.STATUS_LINKED == datum.getStatus()
+            || CrawlDatum.STATUS_SIGNATURE == datum.getStatus()
+            || CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
           continue;
         } else {
-          throw new RuntimeException("Unexpected status: "+datum.getStatus());
+          throw new RuntimeException("Unexpected status: " + datum.getStatus());
         }
       } else if (value instanceof ParseData) {
-        parseData = (ParseData)value;
+        parseData = (ParseData) value;
 
         // Handle robots meta? https://issues.apache.org/jira/browse/NUTCH-1434
         if (deleteRobotsNoIndex) {
@@ -200,64 +204,70 @@ implements Mapper<Text, Writable, Text,
           String robotsMeta = parseData.getMeta("robots");
 
           // Has it a noindex for this url?
-          if (robotsMeta != null && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
+          if (robotsMeta != null
+              && robotsMeta.toLowerCase().indexOf("noindex") != -1) {
             // Delete it!
-            NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+            NutchIndexAction action = new NutchIndexAction(null,
+                NutchIndexAction.DELETE);
             output.collect(key, action);
             return;
           }
         }
       } else if (value instanceof ParseText) {
-        parseText = (ParseText)value;
+        parseText = (ParseText) value;
       } else if (LOG.isWarnEnabled()) {
-        LOG.warn("Unrecognized type: "+value.getClass());
+        LOG.warn("Unrecognized type: " + value.getClass());
       }
     }
-    
+
     // Whether to delete GONE or REDIRECTS
-    if (delete && fetchDatum != null && dbDatum != null) {    
-      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
+    if (delete && fetchDatum != null && dbDatum != null) {
+      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_GONE) {
         reporter.incrCounter("IndexerStatus", "Documents deleted", 1);
 
-        NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+        NutchIndexAction action = new NutchIndexAction(null,
+            NutchIndexAction.DELETE);
         output.collect(key, action);
         return;
       }
-      
-      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM ||
-          fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP ||
-          dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM ||
-          dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
+
+      if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM
+          || fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_TEMP
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_PERM
+          || dbDatum.getStatus() == CrawlDatum.STATUS_DB_REDIR_TEMP) {
         reporter.incrCounter("IndexerStatus", "Deleted redirects", 1);
         reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);
 
-        NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+        NutchIndexAction action = new NutchIndexAction(null,
+            NutchIndexAction.DELETE);
         output.collect(key, action);
         return;
       }
     }
 
-    if (fetchDatum == null || dbDatum == null
-        || parseText == null || parseData == null) {
-      return;                                     // only have inlinks
+    if (fetchDatum == null || dbDatum == null || parseText == null
+        || parseData == null) {
+      return; // only have inlinks
     }
 
     // Whether to delete pages marked as duplicates
     if (delete && dbDatum.getStatus() == CrawlDatum.STATUS_DB_DUPLICATE) {
       reporter.incrCounter("IndexerStatus", "Duplicates deleted", 1);
-      NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+      NutchIndexAction action = new NutchIndexAction(null,
+          NutchIndexAction.DELETE);
       output.collect(key, action);
       return;
     }
-    
+
     // Whether to skip DB_NOTMODIFIED pages
     if (skip && dbDatum.getStatus() == CrawlDatum.STATUS_DB_NOTMODIFIED) {
       reporter.incrCounter("IndexerStatus", "Skipped", 1);
       return;
     }
 
-    if (!parseData.getStatus().isSuccess() ||
-        fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
+    if (!parseData.getStatus().isSuccess()
+        || fetchDatum.getStatus() != CrawlDatum.STATUS_FETCH_SUCCESS) {
       return;
     }
 
@@ -276,11 +286,13 @@ implements Mapper<Text, Writable, Text,
     try {
       // extract information from dbDatum and pass it to
       // fetchDatum so that indexing filters can use it
-      final Text url = (Text) dbDatum.getMetaData().get(Nutch.WRITABLE_REPR_URL_KEY);
+      final Text url = (Text) dbDatum.getMetaData().get(
+          Nutch.WRITABLE_REPR_URL_KEY);
       if (url != null) {
         // Representation URL also needs normalization and filtering.
         // If repr URL is excluded by filters we still accept this document
-        // but represented by its primary URL ("key") which has passed URL filters.
+        // but represented by its primary URL ("key") which has passed URL
+        // filters.
         String urlString = filterUrl(normalizeUrl(url.toString()));
         if (urlString != null) {
           url.set(urlString);
@@ -290,7 +302,9 @@ implements Mapper<Text, Writable, Text,
       // run indexing filters
       doc = this.filters.filter(doc, parse, key, fetchDatum, inlinks);
     } catch (final IndexingException e) {
-      if (LOG.isWarnEnabled()) { LOG.warn("Error indexing "+key+": "+e); }
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Error indexing " + key + ": " + e);
+      }
       reporter.incrCounter("IndexerStatus", "Errors", 1);
       return;
     }
@@ -304,8 +318,8 @@ implements Mapper<Text, Writable, Text,
     float boost = 1.0f;
     // run scoring filters
     try {
-      boost = this.scfilters.indexerScore(key, doc, dbDatum,
-              fetchDatum, parse, inlinks, boost);
+      boost = this.scfilters.indexerScore(key, doc, dbDatum, fetchDatum, parse,
+          inlinks, boost);
     } catch (final ScoringFilterException e) {
       if (LOG.isWarnEnabled()) {
         LOG.warn("Error calculating score " + key + ": " + e);
@@ -323,30 +337,32 @@ implements Mapper<Text, Writable, Text,
     output.collect(key, action);
   }
 
-  public void close() throws IOException { }
+  public void close() throws IOException {
+  }
 
   public static void initMRJob(Path crawlDb, Path linkDb,
-                           Collection<Path> segments,
-                           JobConf job) {
+      Collection<Path> segments, JobConf job) {
 
     LOG.info("IndexerMapReduce: crawldb: " + crawlDb);
-    
-    if (linkDb!=null)
+
+    if (linkDb != null)
       LOG.info("IndexerMapReduce: linkdb: " + linkDb);
 
     for (final Path segment : segments) {
       LOG.info("IndexerMapReduces: adding segment: " + segment);
-      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.FETCH_DIR_NAME));
-      FileInputFormat.addInputPath(job, new Path(segment, CrawlDatum.PARSE_DIR_NAME));
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.FETCH_DIR_NAME));
+      FileInputFormat.addInputPath(job, new Path(segment,
+          CrawlDatum.PARSE_DIR_NAME));
       FileInputFormat.addInputPath(job, new Path(segment, ParseData.DIR_NAME));
       FileInputFormat.addInputPath(job, new Path(segment, ParseText.DIR_NAME));
     }
 
     FileInputFormat.addInputPath(job, new Path(crawlDb, CrawlDb.CURRENT_NAME));
-    
-    if (linkDb!=null)
-	  FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
-    
+
+    if (linkDb != null)
+      FileInputFormat.addInputPath(job, new Path(linkDb, LinkDb.CURRENT_NAME));
+
     job.setInputFormat(SequenceFileInputFormat.class);
 
     job.setMapperClass(IndexerMapReduce.class);

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Thu Jan 29 05:38:59 2015
@@ -27,31 +27,31 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.Progressable;
 
 public class IndexerOutputFormat extends
-        FileOutputFormat<Text, NutchIndexAction> {
+    FileOutputFormat<Text, NutchIndexAction> {
 
-    @Override
-    public RecordWriter<Text, NutchIndexAction> getRecordWriter(
-            FileSystem ignored, JobConf job, String name, Progressable progress)
-            throws IOException {
-
-        final IndexWriters writers = new IndexWriters(job);
-
-        writers.open(job, name);
-
-        return new RecordWriter<Text, NutchIndexAction>() {
-
-            public void close(Reporter reporter) throws IOException {
-                writers.close();
-            }
-
-            public void write(Text key, NutchIndexAction indexAction)
-                    throws IOException {
-                if (indexAction.action == NutchIndexAction.ADD) {
-                    writers.write(indexAction.doc);
-                } else if (indexAction.action == NutchIndexAction.DELETE) {
-                    writers.delete(key.toString());
-                }
-            }
-        };
-    }
+  @Override
+  public RecordWriter<Text, NutchIndexAction> getRecordWriter(
+      FileSystem ignored, JobConf job, String name, Progressable progress)
+      throws IOException {
+
+    final IndexWriters writers = new IndexWriters(job);
+
+    writers.open(job, name);
+
+    return new RecordWriter<Text, NutchIndexAction>() {
+
+      public void close(Reporter reporter) throws IOException {
+        writers.close();
+      }
+
+      public void write(Text key, NutchIndexAction indexAction)
+          throws IOException {
+        if (indexAction.action == NutchIndexAction.ADD) {
+          writers.write(indexAction.doc);
+        } else if (indexAction.action == NutchIndexAction.DELETE) {
+          writers.delete(key.toString());
+        }
+      }
+    };
+  }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilter.java Thu Jan 29 05:38:59 2015
@@ -28,9 +28,9 @@ import org.apache.nutch.crawl.CrawlDatum
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.nutch.plugin.Pluggable;
 
-
-/** Extension point for indexing.  Permits one to add metadata to the indexed
- * fields.  All plugins found which implement this extension point are run
+/**
+ * Extension point for indexing. Permits one to add metadata to the indexed
+ * fields. All plugins found which implement this extension point are run
  * sequentially on the parse.
  */
 public interface IndexingFilter extends Pluggable, Configurable {
@@ -57,6 +57,6 @@ public interface IndexingFilter extends
    *         document should be discarded)
    * @throws IndexingException
    */
-  NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum, Inlinks inlinks)
-    throws IndexingException;
+  NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+      CrawlDatum datum, Inlinks inlinks) throws IndexingException;
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFilters.java Thu Jan 29 05:38:59 2015
@@ -28,12 +28,13 @@ import org.apache.nutch.crawl.CrawlDatum
 import org.apache.nutch.crawl.Inlinks;
 import org.apache.hadoop.io.Text;
 
-/** Creates and caches {@link IndexingFilter} implementing plugins.*/
+/** Creates and caches {@link IndexingFilter} implementing plugins. */
 public class IndexingFilters {
 
   public static final String INDEXINGFILTER_ORDER = "indexingfilter.order";
 
-  public final static Logger LOG = LoggerFactory.getLogger(IndexingFilters.class);
+  public final static Logger LOG = LoggerFactory
+      .getLogger(IndexingFilters.class);
 
   private IndexingFilter[] indexingFilters;
 
@@ -44,12 +45,13 @@ public class IndexingFilters {
   }
 
   /** Run all defined filters. */
-  public NutchDocument filter(NutchDocument doc, Parse parse, Text url, CrawlDatum datum,
-      Inlinks inlinks) throws IndexingException {
+  public NutchDocument filter(NutchDocument doc, Parse parse, Text url,
+      CrawlDatum datum, Inlinks inlinks) throws IndexingException {
     for (int i = 0; i < this.indexingFilters.length; i++) {
       doc = this.indexingFilters[i].filter(doc, parse, url, datum, inlinks);
       // break the loop if an indexing filter discards the doc
-      if (doc == null) return null;
+      if (doc == null)
+        return null;
     }
 
     return doc;

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingFiltersChecker.java Thu Jan 29 05:38:59 2015
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 package org.apache.nutch.indexer;
 
 import java.util.List;
@@ -46,16 +46,19 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Reads and parses a URL and run the indexers on it. Displays the fields obtained and the first
- * 100 characters of their value
- *
- * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker http://www.lemonde.fr
+ * Reads and parses a URL and run the indexers on it. Displays the fields
+ * obtained and the first 100 characters of their value
+ * 
+ * Tested with e.g. ./nutch org.apache.nutch.indexer.IndexingFiltersChecker
+ * http://www.lemonde.fr
+ * 
  * @author Julien Nioche
  **/
 
 public class IndexingFiltersChecker extends Configured implements Tool {
 
-  public static final Logger LOG = LoggerFactory.getLogger(IndexingFiltersChecker.class);
+  public static final Logger LOG = LoggerFactory
+      .getLogger(IndexingFiltersChecker.class);
 
   public IndexingFiltersChecker() {
 
@@ -95,12 +98,13 @@ public class IndexingFiltersChecker exte
     CrawlDatum datum = new CrawlDatum();
 
     ProtocolOutput output = protocol.getProtocolOutput(new Text(url), datum);
-    
+
     if (!output.getStatus().isSuccess()) {
-      System.out.println("Fetch failed with protocol status: " + output.getStatus());
+      System.out.println("Fetch failed with protocol status: "
+          + output.getStatus());
       return 0;
     }
-         
+
     Content content = output.getContent();
 
     if (content == null) {
@@ -115,7 +119,8 @@ public class IndexingFiltersChecker exte
     }
 
     // store the guessed content type in the crawldatum
-    datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE), new Text(contentType));
+    datum.getMetaData().put(new Text(Metadata.CONTENT_TYPE),
+        new Text(contentType));
 
     if (ParseSegment.isTruncated(content)) {
       LOG.warn("Content is truncated, parse may fail!");
@@ -162,7 +167,7 @@ public class IndexingFiltersChecker exte
       System.out.println("Document discarded by indexing filter");
       return 0;
     }
-    
+
     for (String fname : doc.getFieldNames()) {
       List<Object> values = doc.getField(fname).getValues();
       if (values != null) {
@@ -173,14 +178,14 @@ public class IndexingFiltersChecker exte
         }
       }
     }
-    
-    if (conf.getBoolean("doIndex", false) && doc!=null){
+
+    if (conf.getBoolean("doIndex", false) && doc != null) {
       IndexWriters writers = new IndexWriters(getConf());
       writers.open(new JobConf(getConf()), "IndexingFilterChecker");
       writers.write(doc);
       writers.close();
     }
-    
+
     return 0;
   }
 

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexingJob.java Thu Jan 29 05:38:59 2015
@@ -46,145 +46,143 @@ import org.slf4j.LoggerFactory;
 
 public class IndexingJob extends Configured implements Tool {
 
-    public static Logger LOG = LoggerFactory.getLogger(IndexingJob.class);
+  public static Logger LOG = LoggerFactory.getLogger(IndexingJob.class);
 
-    public IndexingJob() {
-        super(null);
-    }
-
-    public IndexingJob(Configuration conf) {
-        super(conf);
-    }
-
-    public void index(Path crawlDb, Path linkDb, List<Path> segments,
-            boolean noCommit) throws IOException {
-        index(crawlDb, linkDb, segments, noCommit, false, null);
-    }
-
-    public void index(Path crawlDb, Path linkDb, List<Path> segments,
-            boolean noCommit, boolean deleteGone) throws IOException {
-        index(crawlDb, linkDb, segments, noCommit, deleteGone, null);
-    }
-
-    public void index(Path crawlDb, Path linkDb, List<Path> segments,
-            boolean noCommit, boolean deleteGone, String params)
-            throws IOException {
-        index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false,
-                false);
-    }
-
-    public void index(Path crawlDb, Path linkDb, List<Path> segments,
-            boolean noCommit, boolean deleteGone, String params,
-            boolean filter, boolean normalize) throws IOException {
-
-        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        long start = System.currentTimeMillis();
-        LOG.info("Indexer: starting at " + sdf.format(start));
-
-        final JobConf job = new NutchJob(getConf());
-        job.setJobName("Indexer");
-
-        LOG.info("Indexer: deleting gone documents: " + deleteGone);
-        LOG.info("Indexer: URL filtering: " + filter);
-        LOG.info("Indexer: URL normalizing: " + normalize);   
-        
-        IndexWriters writers = new IndexWriters(getConf());
-        LOG.info(writers.describe());
-
-        IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
-
-        // NOW PASSED ON THE COMMAND LINE AS A HADOOP PARAM
-        // job.set(SolrConstants.SERVER_URL, solrUrl);
-
-        job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
-        job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
-        job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
-
-        if (params != null) {
-            job.set(IndexerMapReduce.INDEXER_PARAMS, params);
+  public IndexingJob() {
+    super(null);
+  }
+
+  public IndexingJob(Configuration conf) {
+    super(conf);
+  }
+
+  public void index(Path crawlDb, Path linkDb, List<Path> segments,
+      boolean noCommit) throws IOException {
+    index(crawlDb, linkDb, segments, noCommit, false, null);
+  }
+
+  public void index(Path crawlDb, Path linkDb, List<Path> segments,
+      boolean noCommit, boolean deleteGone) throws IOException {
+    index(crawlDb, linkDb, segments, noCommit, deleteGone, null);
+  }
+
+  public void index(Path crawlDb, Path linkDb, List<Path> segments,
+      boolean noCommit, boolean deleteGone, String params) throws IOException {
+    index(crawlDb, linkDb, segments, noCommit, deleteGone, params, false, false);
+  }
+
+  public void index(Path crawlDb, Path linkDb, List<Path> segments,
+      boolean noCommit, boolean deleteGone, String params, boolean filter,
+      boolean normalize) throws IOException {
+
+    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    long start = System.currentTimeMillis();
+    LOG.info("Indexer: starting at " + sdf.format(start));
+
+    final JobConf job = new NutchJob(getConf());
+    job.setJobName("Indexer");
+
+    LOG.info("Indexer: deleting gone documents: " + deleteGone);
+    LOG.info("Indexer: URL filtering: " + filter);
+    LOG.info("Indexer: URL normalizing: " + normalize);
+
+    IndexWriters writers = new IndexWriters(getConf());
+    LOG.info(writers.describe());
+
+    IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
+
+    // NOW PASSED ON THE COMMAND LINE AS A HADOOP PARAM
+    // job.set(SolrConstants.SERVER_URL, solrUrl);
+
+    job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
+    job.setBoolean(IndexerMapReduce.URL_FILTERING, filter);
+    job.setBoolean(IndexerMapReduce.URL_NORMALIZING, normalize);
+
+    if (params != null) {
+      job.set(IndexerMapReduce.INDEXER_PARAMS, params);
+    }
+
+    job.setReduceSpeculativeExecution(false);
+
+    final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
+        + new Random().nextInt());
+
+    FileOutputFormat.setOutputPath(job, tmp);
+    try {
+      JobClient.runJob(job);
+      // do the commits once and for all the reducers in one go
+      if (!noCommit) {
+        writers.open(job, "commit");
+        writers.commit();
+      }
+      long end = System.currentTimeMillis();
+      LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
+          + TimingUtil.elapsedTime(start, end));
+    } finally {
+      FileSystem.get(job).delete(tmp, true);
+    }
+  }
+
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      System.err
+          .println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
+      IndexWriters writers = new IndexWriters(getConf());
+      System.err.println(writers.describe());
+      return -1;
+    }
+
+    final Path crawlDb = new Path(args[0]);
+    Path linkDb = null;
+
+    final List<Path> segments = new ArrayList<Path>();
+    String params = null;
+
+    boolean noCommit = false;
+    boolean deleteGone = false;
+    boolean filter = false;
+    boolean normalize = false;
+
+    for (int i = 1; i < args.length; i++) {
+      if (args[i].equals("-linkdb")) {
+        linkDb = new Path(args[++i]);
+      } else if (args[i].equals("-dir")) {
+        Path dir = new Path(args[++i]);
+        FileSystem fs = dir.getFileSystem(getConf());
+        FileStatus[] fstats = fs.listStatus(dir,
+            HadoopFSUtil.getPassDirectoriesFilter(fs));
+        Path[] files = HadoopFSUtil.getPaths(fstats);
+        for (Path p : files) {
+          segments.add(p);
         }
-
-        job.setReduceSpeculativeExecution(false);
-
-        final Path tmp = new Path("tmp_" + System.currentTimeMillis() + "-"
-                + new Random().nextInt());
-
-        FileOutputFormat.setOutputPath(job, tmp);
-        try {
-            JobClient.runJob(job);
-            // do the commits once and for all the reducers in one go
-            if (!noCommit) {
-                writers.open(job,"commit");
-                writers.commit();
-            }
-            long end = System.currentTimeMillis();
-            LOG.info("Indexer: finished at " + sdf.format(end) + ", elapsed: "
-                    + TimingUtil.elapsedTime(start, end));
-        } finally {
-            FileSystem.get(job).delete(tmp, true);
-        }
-    }
-
-    public int run(String[] args) throws Exception {
-        if (args.length < 2) {
-            System.err
-                    .println("Usage: Indexer <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone] [-filter] [-normalize]");
-            IndexWriters writers = new IndexWriters(getConf());
-            System.err.println(writers.describe());
-            return -1;
-        }
-
-        final Path crawlDb = new Path(args[0]);
-        Path linkDb = null;
-
-        final List<Path> segments = new ArrayList<Path>();
-        String params = null;
-
-        boolean noCommit = false;
-        boolean deleteGone = false;
-        boolean filter = false;
-        boolean normalize = false;
-
-        for (int i = 1; i < args.length; i++) {
-            if (args[i].equals("-linkdb")) {
-                linkDb = new Path(args[++i]);
-            } else if (args[i].equals("-dir")) {
-                Path dir = new Path(args[++i]);
-                FileSystem fs = dir.getFileSystem(getConf());
-                FileStatus[] fstats = fs.listStatus(dir,
-                        HadoopFSUtil.getPassDirectoriesFilter(fs));
-                Path[] files = HadoopFSUtil.getPaths(fstats);
-                for (Path p : files) {
-                    segments.add(p);
-                }
-            } else if (args[i].equals("-noCommit")) {
-                noCommit = true;
-            } else if (args[i].equals("-deleteGone")) {
-                deleteGone = true;
-            } else if (args[i].equals("-filter")) {
-                filter = true;
-            } else if (args[i].equals("-normalize")) {
-                normalize = true;
-            } else if (args[i].equals("-params")) {
-                params = args[++i];
-            } else {
-                segments.add(new Path(args[i]));
-            }
-        }
-
-        try {
-            index(crawlDb, linkDb, segments, noCommit, deleteGone, params,
-                    filter, normalize);
-            return 0;
-        } catch (final Exception e) {
-            LOG.error("Indexer: " + StringUtils.stringifyException(e));
-            return -1;
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        final int res = ToolRunner.run(NutchConfiguration.create(),
-                new IndexingJob(), args);
-        System.exit(res);
-    }
+      } else if (args[i].equals("-noCommit")) {
+        noCommit = true;
+      } else if (args[i].equals("-deleteGone")) {
+        deleteGone = true;
+      } else if (args[i].equals("-filter")) {
+        filter = true;
+      } else if (args[i].equals("-normalize")) {
+        normalize = true;
+      } else if (args[i].equals("-params")) {
+        params = args[++i];
+      } else {
+        segments.add(new Path(args[i]));
+      }
+    }
+
+    try {
+      index(crawlDb, linkDb, segments, noCommit, deleteGone, params, filter,
+          normalize);
+      return 0;
+    } catch (final Exception e) {
+      LOG.error("Indexer: " + StringUtils.stringifyException(e));
+      return -1;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    final int res = ToolRunner.run(NutchConfiguration.create(),
+        new IndexingJob(), args);
+    System.exit(res);
+  }
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchDocument.java Thu Jan 29 05:38:59 2015
@@ -31,12 +31,12 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.nutch.metadata.Metadata;
 
-/** A {@link NutchDocument} is the unit of indexing.*/
-public class NutchDocument
-implements Writable, Iterable<Entry<String, NutchField>> {
+/** A {@link NutchDocument} is the unit of indexing. */
+public class NutchDocument implements Writable,
+    Iterable<Entry<String, NutchField>> {
 
   public static final byte VERSION = 2;
-  
+
   private Map<String, NutchField> fields;
 
   private Metadata documentMeta;
@@ -127,8 +127,8 @@ implements Writable, Iterable<Entry<Stri
     out.writeFloat(weight);
     documentMeta.write(out);
   }
-  
-  public String toString() { 
+
+  public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append("doc {\n");
     for (Map.Entry<String, NutchField> entry : fields.entrySet()) {

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchField.java Thu Jan 29 05:38:59 2015
@@ -28,32 +28,33 @@ import java.util.List;
 import org.apache.hadoop.io.*;
 
 /**
- * This class represents a multi-valued field with a weight. 
- * Values are arbitrary objects.
+ * This class represents a multi-valued field with a weight. Values are
+ * arbitrary objects.
  */
 public class NutchField implements Writable {
   private float weight;
   private List<Object> values = new ArrayList<Object>();
-  
-  public NutchField() { }
-  
+
+  public NutchField() {
+  }
+
   public NutchField(Object value) {
     this(value, 1.0f);
   }
-  
+
   public NutchField(Object value, float weight) {
     this.weight = weight;
     if (value instanceof Collection) {
-      values.addAll((Collection<?>)value);
+      values.addAll((Collection<?>) value);
     } else {
       values.add(value);
     }
   }
-  
+
   public void add(Object value) {
     values.add(value);
   }
-  
+
   public float getWeight() {
     return weight;
   }
@@ -65,7 +66,7 @@ public class NutchField implements Writa
   public List<Object> getValues() {
     return values;
   }
-  
+
   public void reset() {
     weight = 1.0f;
     values.clear();
@@ -73,13 +74,13 @@ public class NutchField implements Writa
 
   @Override
   public Object clone() throws CloneNotSupportedException {
-    NutchField result = (NutchField)super.clone();
+    NutchField result = (NutchField) super.clone();
     result.weight = weight;
     result.values = values;
 
     return result;
   }
-  
+
   @Override
   public void readFields(DataInput in) throws IOException {
     weight = in.readFloat();
@@ -87,7 +88,7 @@ public class NutchField implements Writa
     values = new ArrayList<Object>();
     for (int i = 0; i < count; i++) {
       String type = Text.readString(in);
-      
+
       if (type.equals("java.lang.String")) {
         values.add(Text.readString(in));
       } else if (type.equals("java.lang.Boolean")) {
@@ -109,26 +110,26 @@ public class NutchField implements Writa
     out.writeFloat(weight);
     out.writeInt(values.size());
     for (Object value : values) {
-    
+
       Text.writeString(out, value.getClass().getName());
-        
+
       if (value instanceof Boolean) {
-        out.writeBoolean((Boolean)value);
+        out.writeBoolean((Boolean) value);
       } else if (value instanceof Integer) {
-        out.writeInt((Integer)value);
+        out.writeInt((Integer) value);
       } else if (value instanceof Long) {
-        out.writeLong((Long)value);
+        out.writeLong((Long) value);
       } else if (value instanceof Float) {
-        out.writeFloat((Float)value);
+        out.writeFloat((Float) value);
       } else if (value instanceof String) {
-        Text.writeString(out, (String)value);
+        Text.writeString(out, (String) value);
       } else if (value instanceof Date) {
-        Date date = (Date)value;
+        Date date = (Date) value;
         out.writeLong(date.getTime());
       }
     }
   }
-  
+
   public String toString() {
     return values.toString();
   }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java Thu Jan 29 05:38:59 2015
@@ -25,8 +25,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.nutch.indexer.NutchDocument;
 
 /**
- * A {@link NutchIndexAction} is the new unit of indexing holding the
- * document and action information.
+ * A {@link NutchIndexAction} is the new unit of indexing holding the document
+ * and action information.
  */
 public class NutchIndexAction implements Writable {
 

Modified: nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java?rev=1655526&r1=1655525&r2=1655526&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/metadata/CreativeCommons.java Thu Jan 29 05:38:59 2015
@@ -16,21 +16,20 @@
  */
 package org.apache.nutch.metadata;
 
-
 /**
  * A collection of Creative Commons properties names.
- *
+ * 
  * @see <a href="http://www.creativecommons.org/">creativecommons.org</a>
- *
+ * 
  * @author Chris Mattmann
  * @author J&eacute;r&ocirc;me Charron
  */
 public interface CreativeCommons {
-  
+
   public final static String LICENSE_URL = "License-Url";
-  
+
   public final static String LICENSE_LOCATION = "License-Location";
-  
+
   public final static String WORK_TYPE = "Work-Type";
-  
+
 }