You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2012/01/10 14:57:30 UTC

svn commit: r1229544 - in /nutch/trunk: ./ src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer/solr/

Author: markus
Date: Tue Jan 10 13:57:29 2012
New Revision: 1229544

URL: http://svn.apache.org/viewvc?rev=1229544&view=rev
Log:
NUTCH-1139 Indexer to delete gone documents

Added:
    nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
Modified:
    nutch/trunk/CHANGES.txt
    nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
    nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
    nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java
    nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java
    nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java

Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Jan 10 13:57:29 2012
@@ -1,5 +1,7 @@
 Nutch Change Log
 
+* NUTCH-1139 Indexer to delete gone documents (markus)
+
 * NUTCH-1244 CrawlDBDumper to filter by regex (markus)
 
 * NUTCH-1237 Improve javac arguements for more verbose ouput (lewismc)

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java Tue Jan 10 13:57:29 2012
@@ -50,10 +50,13 @@ import org.apache.nutch.scoring.ScoringF
 
 public class IndexerMapReduce extends Configured
 implements Mapper<Text, Writable, Text, NutchWritable>,
-          Reducer<Text, NutchWritable, Text, NutchDocument> {
+          Reducer<Text, NutchWritable, Text, NutchIndexAction> {
 
   public static final Logger LOG = LoggerFactory.getLogger(IndexerMapReduce.class);
 
+  public static final String INDEXER_DELETE = "indexer.delete";
+
+  private boolean delete = false;
   private IndexingFilters filters;
   private ScoringFilters scfilters;
 
@@ -61,6 +64,7 @@ implements Mapper<Text, Writable, Text, 
     setConf(job);
     this.filters = new IndexingFilters(getConf());
     this.scfilters = new ScoringFilters(getConf());
+    this.delete = job.getBoolean(INDEXER_DELETE, false);
   }
 
   public void map(Text key, Writable value,
@@ -69,13 +73,14 @@ implements Mapper<Text, Writable, Text, 
   }
 
   public void reduce(Text key, Iterator<NutchWritable> values,
-                     OutputCollector<Text, NutchDocument> output, Reporter reporter)
+                     OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
     throws IOException {
     Inlinks inlinks = null;
     CrawlDatum dbDatum = null;
     CrawlDatum fetchDatum = null;
     ParseData parseData = null;
     ParseText parseText = null;
+
     while (values.hasNext()) {
       final Writable value = values.next().get(); // unwrap
       if (value instanceof Inlinks) {
@@ -85,9 +90,32 @@ implements Mapper<Text, Writable, Text, 
         if (CrawlDatum.hasDbStatus(datum))
           dbDatum = datum;
         else if (CrawlDatum.hasFetchStatus(datum)) {
+
           // don't index unmodified (empty) pages
-          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
+          if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
             fetchDatum = datum;
+
+            /**
+             * Check if we need to delete 404 NOT FOUND and 301 PERMANENT REDIRECT.
+             */
+            if (delete) {
+              if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
+                reporter.incrCounter("IndexerStatus", "Documents deleted", 1);
+
+                NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+                output.collect(key, action);
+                continue;
+              }
+              if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM) {
+                reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);
+
+                NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+                output.collect(key, action);
+                continue;
+              }
+            }
+          }
+
         } else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
                    CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
                    CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
@@ -163,7 +191,8 @@ implements Mapper<Text, Writable, Text, 
 
     reporter.incrCounter("IndexerStatus", "Documents added", 1);
 
-    output.collect(key, doc);
+    NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
+    output.collect(key, action);
   }
 
   public void close() throws IOException { }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Tue Jan 10 13:57:29 2012
@@ -26,10 +26,10 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-public class IndexerOutputFormat extends FileOutputFormat<Text, NutchDocument> {
+public class IndexerOutputFormat extends FileOutputFormat<Text, NutchIndexAction> {
 
   @Override
-  public RecordWriter<Text, NutchDocument> getRecordWriter(FileSystem ignored,
+  public RecordWriter<Text, NutchIndexAction> getRecordWriter(FileSystem ignored,
       JobConf job, String name, Progressable progress) throws IOException {
     
     // populate JobConf with field indexing options
@@ -41,7 +41,7 @@ public class IndexerOutputFormat extends
     for (final NutchIndexWriter writer : writers) {
       writer.open(job, name);
     }
-    return new RecordWriter<Text, NutchDocument>() {
+    return new RecordWriter<Text, NutchIndexAction>() {
 
       public void close(Reporter reporter) throws IOException {
         for (final NutchIndexWriter writer : writers) {
@@ -49,9 +49,14 @@ public class IndexerOutputFormat extends
         }
       }
 
-      public void write(Text key, NutchDocument doc) throws IOException {
+      public void write(Text key, NutchIndexAction indexAction) throws IOException {
         for (final NutchIndexWriter writer : writers) {
-          writer.write(doc);
+          if (indexAction.action == NutchIndexAction.ADD) {
+            writer.write(indexAction.doc);
+          }
+          if (indexAction.action == NutchIndexAction.DELETE) {
+            writer.delete(key.toString());
+          }
         }
       }
     };

Added: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java?rev=1229544&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java Tue Jan 10 13:57:29 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import org.apache.nutch.indexer.NutchDocument;
+
+/**
+ * A {@link NutchIndexAction} is the new unit of indexing holding the
+ * document and action information.
+ */
+class NutchIndexAction implements Writable {
+
+  public static final byte ADD = 0;
+  public static final byte DELETE = 1;
+
+  public NutchDocument doc = null;
+  public byte action = ADD;
+
+  public NutchIndexAction(NutchDocument doc, byte action) {
+    this.doc = doc;
+    this.action = action;
+  }
+
+  public void readFields(DataInput in) throws IOException {
+    action = in.readByte();
+    NutchDocument doc = new NutchDocument();
+    doc.readFields(in);
+  }
+
+  public void write(DataOutput out) throws IOException {
+    out.write(action);
+    doc.write(out);
+  }
+}
\ No newline at end of file

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java Tue Jan 10 13:57:29 2012
@@ -25,6 +25,8 @@ public interface NutchIndexWriter {
 
   public void write(NutchDocument doc) throws IOException;
 
+  public void delete(String key) throws IOException;
+
   public void close() throws IOException;
 
 }

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java Tue Jan 10 13:57:29 2012
@@ -57,16 +57,21 @@ public class SolrIndexer extends Configu
 
   public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
       List<Path> segments) throws IOException {
-      indexSolr(solrUrl, crawlDb, linkDb, segments, false, null);
+      indexSolr(solrUrl, crawlDb, linkDb, segments, false, false, null);
   }
 
   public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
           List<Path> segments, boolean noCommit) throws IOException {
-    indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, null);
+    indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, false, null);
+  }
+
+  public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
+          List<Path> segments, boolean noCommit, boolean deleteGone) throws IOException {
+    indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, deleteGone, null);
   }
   
   public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
-      List<Path> segments, boolean noCommit, String solrParams) throws IOException {
+      List<Path> segments, boolean noCommit, boolean deleteGone, String solrParams) throws IOException {
     SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
     long start = System.currentTimeMillis();
     LOG.info("SolrIndexer: starting at " + sdf.format(start));
@@ -74,9 +79,14 @@ public class SolrIndexer extends Configu
     final JobConf job = new NutchJob(getConf());
     job.setJobName("index-solr " + solrUrl);
 
+    if (deleteGone) {
+      LOG.info("SolrIndexer: deleting gone documents");
+    }
+
     IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
 
     job.set(SolrConstants.SERVER_URL, solrUrl);
+    job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
     if (solrParams != null) {
       job.set(SolrConstants.PARAMS, solrParams);
     }
@@ -108,7 +118,7 @@ public class SolrIndexer extends Configu
 
   public int run(String[] args) throws Exception {
     if (args.length < 3) {
-      System.err.println("Usage: SolrIndexer <solr url> <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit]");
+      System.err.println("Usage: SolrIndexer <solr url> <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone]");
       return -1;
     }
 
@@ -119,6 +129,7 @@ public class SolrIndexer extends Configu
     String params = null;
 
     boolean noCommit = false;
+    boolean deleteGone = false;
 
     for (int i = 2; i < args.length; i++) {
     	if (args[i].equals("-linkdb")) {
@@ -135,6 +146,8 @@ public class SolrIndexer extends Configu
         }
       } else if (args[i].equals("-noCommit")) {
         noCommit = true;
+      } else if (args[i].equals("-deleteGone")) {
+        deleteGone = true;
       } else if (args[i].equals("-params")) {
         params = args[++i];
       } else {
@@ -143,7 +156,7 @@ public class SolrIndexer extends Configu
     }
 
     try {
-      indexSolr(args[0], crawlDb, linkDb, segments, noCommit, params);
+      indexSolr(args[0], crawlDb, linkDb, segments, noCommit, deleteGone, params);
       return 0;
     } catch (final Exception e) {
       LOG.error("SolrIndexer: " + StringUtils.stringifyException(e));

Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java Tue Jan 10 13:57:29 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.nutch.indexer.NutchDocument;
 import org.apache.nutch.indexer.NutchField;
 import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.indexer.IndexerMapReduce;
 import org.apache.solr.client.solrj.SolrServer;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -47,6 +48,8 @@ public class SolrWriter implements Nutch
     new ArrayList<SolrInputDocument>();
 
   private int commitSize;
+  private int numDeletes = 0;
+  private boolean delete = false;
 
   public void open(JobConf job, String name) throws IOException {
     SolrServer server = SolrUtils.getCommonsHttpSolrServer(job);
@@ -58,6 +61,7 @@ public class SolrWriter implements Nutch
     solr = server;
     commitSize = job.getInt(SolrConstants.COMMIT_SIZE, 1000);
     solrMapping = SolrMappingReader.getInstance(job);
+    delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
     // parse optional params
     params = new ModifiableSolrParams();
     String paramString = job.get(SolrConstants.PARAMS);
@@ -73,6 +77,17 @@ public class SolrWriter implements Nutch
     }
   }
 
+  public void delete(String key) throws IOException {
+    if (delete) {
+      try {
+        solr.deleteById(key);
+        numDeletes++;
+      } catch (final SolrServerException e) {
+        throw makeIOException(e);
+      }
+    }
+  }
+
   public void write(NutchDocument doc) throws IOException {
     final SolrInputDocument inputDoc = new SolrInputDocument();
     for(final Entry<String, NutchField> e : doc) {
@@ -95,11 +110,14 @@ public class SolrWriter implements Nutch
         }
       }
     }
+
     inputDoc.setDocumentBoost(doc.getWeight());
     inputDocs.add(inputDoc);
-    if (inputDocs.size() >= commitSize) {
+    if (inputDocs.size() + numDeletes >= commitSize) {
       try {
-        LOG.info("Adding " + Integer.toString(inputDocs.size()) + " documents");
+        LOG.info("Indexing " + Integer.toString(inputDocs.size()) + " documents");
+        LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+        numDeletes = 0;
         UpdateRequest req = new UpdateRequest();
         req.add(inputDocs);
         req.setParams(params);
@@ -114,7 +132,10 @@ public class SolrWriter implements Nutch
   public void close() throws IOException {
     try {
       if (!inputDocs.isEmpty()) {
-        LOG.info("Adding " + Integer.toString(inputDocs.size()) + " documents");
+        LOG.info("Indexing " + Integer.toString(inputDocs.size()) + " documents");
+        if (numDeletes > 0) {
+          LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+        }
         UpdateRequest req = new UpdateRequest();
         req.add(inputDocs);
         req.setParams(params);