You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by ma...@apache.org on 2012/01/10 14:57:30 UTC
svn commit: r1229544 - in /nutch/trunk: ./
src/java/org/apache/nutch/indexer/ src/java/org/apache/nutch/indexer/solr/
Author: markus
Date: Tue Jan 10 13:57:29 2012
New Revision: 1229544
URL: http://svn.apache.org/viewvc?rev=1229544&view=rev
Log:
NUTCH-1139 Indexer to delete gone documents
Added:
nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
Modified:
nutch/trunk/CHANGES.txt
nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java
nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java
nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java
Modified: nutch/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/nutch/trunk/CHANGES.txt?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/CHANGES.txt (original)
+++ nutch/trunk/CHANGES.txt Tue Jan 10 13:57:29 2012
@@ -1,5 +1,7 @@
Nutch Change Log
+* NUTCH-1139 Indexer to delete gone documents (markus)
+
* NUTCH-1244 CrawlDBDumper to filter by regex (markus)
* NUTCH-1237 Improve javac arguements for more verbose ouput (lewismc)
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerMapReduce.java Tue Jan 10 13:57:29 2012
@@ -50,10 +50,13 @@ import org.apache.nutch.scoring.ScoringF
public class IndexerMapReduce extends Configured
implements Mapper<Text, Writable, Text, NutchWritable>,
- Reducer<Text, NutchWritable, Text, NutchDocument> {
+ Reducer<Text, NutchWritable, Text, NutchIndexAction> {
public static final Logger LOG = LoggerFactory.getLogger(IndexerMapReduce.class);
+ public static final String INDEXER_DELETE = "indexer.delete";
+
+ private boolean delete = false;
private IndexingFilters filters;
private ScoringFilters scfilters;
@@ -61,6 +64,7 @@ implements Mapper<Text, Writable, Text,
setConf(job);
this.filters = new IndexingFilters(getConf());
this.scfilters = new ScoringFilters(getConf());
+ this.delete = job.getBoolean(INDEXER_DELETE, false);
}
public void map(Text key, Writable value,
@@ -69,13 +73,14 @@ implements Mapper<Text, Writable, Text,
}
public void reduce(Text key, Iterator<NutchWritable> values,
- OutputCollector<Text, NutchDocument> output, Reporter reporter)
+ OutputCollector<Text, NutchIndexAction> output, Reporter reporter)
throws IOException {
Inlinks inlinks = null;
CrawlDatum dbDatum = null;
CrawlDatum fetchDatum = null;
ParseData parseData = null;
ParseText parseText = null;
+
while (values.hasNext()) {
final Writable value = values.next().get(); // unwrap
if (value instanceof Inlinks) {
@@ -85,9 +90,32 @@ implements Mapper<Text, Writable, Text,
if (CrawlDatum.hasDbStatus(datum))
dbDatum = datum;
else if (CrawlDatum.hasFetchStatus(datum)) {
+
// don't index unmodified (empty) pages
- if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED)
+ if (datum.getStatus() != CrawlDatum.STATUS_FETCH_NOTMODIFIED) {
fetchDatum = datum;
+
+ /**
+ * Check if we need to delete 404 NOT FOUND and 301 PERMANENT REDIRECT.
+ */
+ if (delete) {
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_GONE) {
+ reporter.incrCounter("IndexerStatus", "Documents deleted", 1);
+
+ NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ output.collect(key, action);
+ continue;
+ }
+ if (fetchDatum.getStatus() == CrawlDatum.STATUS_FETCH_REDIR_PERM) {
+ reporter.incrCounter("IndexerStatus", "Perm redirects deleted", 1);
+
+ NutchIndexAction action = new NutchIndexAction(null, NutchIndexAction.DELETE);
+ output.collect(key, action);
+ continue;
+ }
+ }
+ }
+
} else if (CrawlDatum.STATUS_LINKED == datum.getStatus() ||
CrawlDatum.STATUS_SIGNATURE == datum.getStatus() ||
CrawlDatum.STATUS_PARSE_META == datum.getStatus()) {
@@ -163,7 +191,8 @@ implements Mapper<Text, Writable, Text,
reporter.incrCounter("IndexerStatus", "Documents added", 1);
- output.collect(key, doc);
+ NutchIndexAction action = new NutchIndexAction(doc, NutchIndexAction.ADD);
+ output.collect(key, action);
}
public void close() throws IOException { }
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/IndexerOutputFormat.java Tue Jan 10 13:57:29 2012
@@ -26,10 +26,10 @@ import org.apache.hadoop.mapred.RecordWr
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
-public class IndexerOutputFormat extends FileOutputFormat<Text, NutchDocument> {
+public class IndexerOutputFormat extends FileOutputFormat<Text, NutchIndexAction> {
@Override
- public RecordWriter<Text, NutchDocument> getRecordWriter(FileSystem ignored,
+ public RecordWriter<Text, NutchIndexAction> getRecordWriter(FileSystem ignored,
JobConf job, String name, Progressable progress) throws IOException {
// populate JobConf with field indexing options
@@ -41,7 +41,7 @@ public class IndexerOutputFormat extends
for (final NutchIndexWriter writer : writers) {
writer.open(job, name);
}
- return new RecordWriter<Text, NutchDocument>() {
+ return new RecordWriter<Text, NutchIndexAction>() {
public void close(Reporter reporter) throws IOException {
for (final NutchIndexWriter writer : writers) {
@@ -49,9 +49,14 @@ public class IndexerOutputFormat extends
}
}
- public void write(Text key, NutchDocument doc) throws IOException {
+ public void write(Text key, NutchIndexAction indexAction) throws IOException {
for (final NutchIndexWriter writer : writers) {
- writer.write(doc);
+ if (indexAction.action == NutchIndexAction.ADD) {
+ writer.write(indexAction.doc);
+ }
+ if (indexAction.action == NutchIndexAction.DELETE) {
+ writer.delete(key.toString());
+ }
}
}
};
Added: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java?rev=1229544&view=auto
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java (added)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexAction.java Tue Jan 10 13:57:29 2012
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nutch.indexer;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+import org.apache.nutch.indexer.NutchDocument;
+
+/**
+ * A {@link NutchIndexAction} is the new unit of indexing holding the
+ * document and action information.
+ */
+class NutchIndexAction implements Writable {
+
+ public static final byte ADD = 0;
+ public static final byte DELETE = 1;
+
+ public NutchDocument doc = null;
+ public byte action = ADD;
+
+ public NutchIndexAction(NutchDocument doc, byte action) {
+ this.doc = doc;
+ this.action = action;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ action = in.readByte();
+ NutchDocument doc = new NutchDocument();
+ doc.readFields(in);
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.write(action);
+ doc.write(out);
+ }
+}
\ No newline at end of file
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/NutchIndexWriter.java Tue Jan 10 13:57:29 2012
@@ -25,6 +25,8 @@ public interface NutchIndexWriter {
public void write(NutchDocument doc) throws IOException;
+ public void delete(String key) throws IOException;
+
public void close() throws IOException;
}
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrIndexer.java Tue Jan 10 13:57:29 2012
@@ -57,16 +57,21 @@ public class SolrIndexer extends Configu
public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
List<Path> segments) throws IOException {
- indexSolr(solrUrl, crawlDb, linkDb, segments, false, null);
+ indexSolr(solrUrl, crawlDb, linkDb, segments, false, false, null);
}
public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
List<Path> segments, boolean noCommit) throws IOException {
- indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, null);
+ indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, false, null);
+ }
+
+ public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
+ List<Path> segments, boolean noCommit, boolean deleteGone) throws IOException {
+ indexSolr(solrUrl, crawlDb, linkDb, segments, noCommit, deleteGone, null);
}
public void indexSolr(String solrUrl, Path crawlDb, Path linkDb,
- List<Path> segments, boolean noCommit, String solrParams) throws IOException {
+ List<Path> segments, boolean noCommit, boolean deleteGone, String solrParams) throws IOException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start = System.currentTimeMillis();
LOG.info("SolrIndexer: starting at " + sdf.format(start));
@@ -74,9 +79,14 @@ public class SolrIndexer extends Configu
final JobConf job = new NutchJob(getConf());
job.setJobName("index-solr " + solrUrl);
+ if (deleteGone) {
+ LOG.info("SolrIndexer: deleting gone documents");
+ }
+
IndexerMapReduce.initMRJob(crawlDb, linkDb, segments, job);
job.set(SolrConstants.SERVER_URL, solrUrl);
+ job.setBoolean(IndexerMapReduce.INDEXER_DELETE, deleteGone);
if (solrParams != null) {
job.set(SolrConstants.PARAMS, solrParams);
}
@@ -108,7 +118,7 @@ public class SolrIndexer extends Configu
public int run(String[] args) throws Exception {
if (args.length < 3) {
- System.err.println("Usage: SolrIndexer <solr url> <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit]");
+ System.err.println("Usage: SolrIndexer <solr url> <crawldb> [-linkdb <linkdb>] [-params k1=v1&k2=v2...] (<segment> ... | -dir <segments>) [-noCommit] [-deleteGone]");
return -1;
}
@@ -119,6 +129,7 @@ public class SolrIndexer extends Configu
String params = null;
boolean noCommit = false;
+ boolean deleteGone = false;
for (int i = 2; i < args.length; i++) {
if (args[i].equals("-linkdb")) {
@@ -135,6 +146,8 @@ public class SolrIndexer extends Configu
}
} else if (args[i].equals("-noCommit")) {
noCommit = true;
+ } else if (args[i].equals("-deleteGone")) {
+ deleteGone = true;
} else if (args[i].equals("-params")) {
params = args[++i];
} else {
@@ -143,7 +156,7 @@ public class SolrIndexer extends Configu
}
try {
- indexSolr(args[0], crawlDb, linkDb, segments, noCommit, params);
+ indexSolr(args[0], crawlDb, linkDb, segments, noCommit, deleteGone, params);
return 0;
} catch (final Exception e) {
LOG.error("SolrIndexer: " + StringUtils.stringifyException(e));
Modified: nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java
URL: http://svn.apache.org/viewvc/nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java?rev=1229544&r1=1229543&r2=1229544&view=diff
==============================================================================
--- nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java (original)
+++ nutch/trunk/src/java/org/apache/nutch/indexer/solr/SolrWriter.java Tue Jan 10 13:57:29 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.nutch.indexer.NutchDocument;
import org.apache.nutch.indexer.NutchField;
import org.apache.nutch.indexer.NutchIndexWriter;
+import org.apache.nutch.indexer.IndexerMapReduce;
import org.apache.solr.client.solrj.SolrServer;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.request.UpdateRequest;
@@ -47,6 +48,8 @@ public class SolrWriter implements Nutch
new ArrayList<SolrInputDocument>();
private int commitSize;
+ private int numDeletes = 0;
+ private boolean delete = false;
public void open(JobConf job, String name) throws IOException {
SolrServer server = SolrUtils.getCommonsHttpSolrServer(job);
@@ -58,6 +61,7 @@ public class SolrWriter implements Nutch
solr = server;
commitSize = job.getInt(SolrConstants.COMMIT_SIZE, 1000);
solrMapping = SolrMappingReader.getInstance(job);
+ delete = job.getBoolean(IndexerMapReduce.INDEXER_DELETE, false);
// parse optional params
params = new ModifiableSolrParams();
String paramString = job.get(SolrConstants.PARAMS);
@@ -73,6 +77,17 @@ public class SolrWriter implements Nutch
}
}
+ public void delete(String key) throws IOException {
+ if (delete) {
+ try {
+ solr.deleteById(key);
+ numDeletes++;
+ } catch (final SolrServerException e) {
+ throw makeIOException(e);
+ }
+ }
+ }
+
public void write(NutchDocument doc) throws IOException {
final SolrInputDocument inputDoc = new SolrInputDocument();
for(final Entry<String, NutchField> e : doc) {
@@ -95,11 +110,14 @@ public class SolrWriter implements Nutch
}
}
}
+
inputDoc.setDocumentBoost(doc.getWeight());
inputDocs.add(inputDoc);
- if (inputDocs.size() >= commitSize) {
+ if (inputDocs.size() + numDeletes >= commitSize) {
try {
- LOG.info("Adding " + Integer.toString(inputDocs.size()) + " documents");
+ LOG.info("Indexing " + Integer.toString(inputDocs.size()) + " documents");
+ LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+ numDeletes = 0;
UpdateRequest req = new UpdateRequest();
req.add(inputDocs);
req.setParams(params);
@@ -114,7 +132,10 @@ public class SolrWriter implements Nutch
public void close() throws IOException {
try {
if (!inputDocs.isEmpty()) {
- LOG.info("Adding " + Integer.toString(inputDocs.size()) + " documents");
+ LOG.info("Indexing " + Integer.toString(inputDocs.size()) + " documents");
+ if (numDeletes > 0) {
+ LOG.info("Deleting " + Integer.toString(numDeletes) + " documents");
+ }
UpdateRequest req = new UpdateRequest();
req.add(inputDocs);
req.setParams(params);