You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by le...@apache.org on 2012/11/22 15:45:08 UTC
svn commit: r1412566 - in /nutch/branches/2.x:
conf/gora-cassandra-mapping.xml
src/java/org/apache/nutch/crawl/InjectorJob.java
src/java/org/apache/nutch/storage/StorageUtils.java
Author: lewismc
Date: Thu Nov 22 14:45:07 2012
New Revision: 1412566
URL: http://svn.apache.org/viewvc?rev=1412566&view=rev
Log:
NUTCH-1370 Expose exact number of urls injected @runtime
Modified:
nutch/branches/2.x/conf/gora-cassandra-mapping.xml
nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java
nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
Modified: nutch/branches/2.x/conf/gora-cassandra-mapping.xml
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/conf/gora-cassandra-mapping.xml?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/conf/gora-cassandra-mapping.xml (original)
+++ nutch/branches/2.x/conf/gora-cassandra-mapping.xml Thu Nov 22 14:45:07 2012
@@ -55,4 +55,15 @@
<field name="protocolStatus" family="sc" qualifier="prs"/>
</class>
+ <keyspace name="host" cluster="Test Cluster" host="localhost">
+ <family name="mtdt" type="super"/>
+ <family name="il" type="super"/>
+ <family name="ol" type="super"/>
+ </keyspace>
+ <class keyClass="java.lang.String" name="org.apache.nutch.storage.Host" keyspace="host">
+ <field name="metadata" family="mtdt" qualifier="mtdt"/>
+ <field name="inlinks" family="il" qualifier="il"/>
+ <field name="outlinks" family="ol" qualifier="ol"/>
+ </class>
+
</gora-orm>
Modified: nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/crawl/InjectorJob.java Thu Nov 22 14:45:07 2012
@@ -18,6 +18,7 @@ package org.apache.nutch.crawl;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.text.SimpleDateFormat;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -26,6 +27,7 @@ import java.util.TreeMap;
import org.apache.avro.util.Utf8;
import org.apache.gora.mapreduce.GoraOutputFormat;
+import org.apache.gora.persistency.Persistent;
import org.apache.gora.store.DataStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -49,6 +51,7 @@ import org.apache.nutch.util.NutchConfig
import org.apache.nutch.util.NutchJob;
import org.apache.nutch.util.NutchTool;
import org.apache.nutch.util.TableUtil;
+import org.apache.nutch.util.TimingUtil;
import org.apache.nutch.util.ToolUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,7 +72,7 @@ public class InjectorJob extends NutchTo
private static final Set<WebPage.Field> FIELDS = new HashSet<WebPage.Field>();
private static final Utf8 YES_STRING = new Utf8("y");
-
+
static {
FIELDS.add(WebPage.Field.MARKERS);
FIELDS.add(WebPage.Field.STATUS);
@@ -81,7 +84,7 @@ public class InjectorJob extends NutchTo
* metadata key reserved for setting a custom fetchInterval for a specific URL
*/
public static String nutchFetchIntervalMDName = "nutch.fetchInterval";
-
+
public static class UrlMapper extends
Mapper<LongWritable, Text, String, WebPage> {
private URLNormalizers urlNormalizers;
@@ -92,24 +95,27 @@ public class InjectorJob extends NutchTo
private long curTime;
@Override
- protected void setup(Context context) throws IOException,
- InterruptedException {
+ protected void setup(Context context) throws IOException, InterruptedException {
urlNormalizers = new URLNormalizers(context.getConfiguration(),
- URLNormalizers.SCOPE_INJECT);
+ URLNormalizers.SCOPE_INJECT);
interval = context.getConfiguration().getInt("db.fetch.interval.default",
- 2592000);
+ 2592000);
filters = new URLFilters(context.getConfiguration());
scfilters = new ScoringFilters(context.getConfiguration());
scoreInjected = context.getConfiguration().getFloat("db.score.injected",
- 1.0f);
+ 1.0f);
curTime = context.getConfiguration().getLong("injector.current.time",
- System.currentTimeMillis());
+ System.currentTimeMillis());
}
- @Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
- String url = value.toString();
+ String url = value.toString(); // value is line of text
+
+ if (url != null && url.trim().startsWith("#")) {
+ /* Ignore line that start with # */
+ return;
+ }
// if tabs : metadata that could be stored
// must be name=value and separated by \t
@@ -149,10 +155,11 @@ public class InjectorJob extends NutchTo
LOG.warn("Skipping " + url + ":" + e);
url = null;
}
- if (url == null)
+ if (url == null) {
+ context.getCounter("injector", "urls_filtered").increment(1);
return;
-
- String reversedUrl = TableUtil.reverseUrl(url);
+ } else { // if it passes
+ String reversedUrl = TableUtil.reverseUrl(url); // collect it
WebPage row = new WebPage();
row.setFetchTime(curTime);
row.setFetchInterval(customInterval);
@@ -160,34 +167,33 @@ public class InjectorJob extends NutchTo
// now add the metadata
Iterator<String> keysIter = metadata.keySet().iterator();
while (keysIter.hasNext()) {
- String keymd = keysIter.next();
- String valuemd = metadata.get(keymd);
- row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
+ String keymd = keysIter.next();
+ String valuemd = metadata.get(keymd);
+ row.putToMetadata(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes()));
}
if (customScore != -1)
- row.setScore(customScore);
+ row.setScore(customScore);
else
- row.setScore(scoreInjected);
+ row.setScore(scoreInjected);
try {
- scfilters.injectedScore(url, row);
+ scfilters.injectedScore(url, row);
} catch (ScoringFilterException e) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("Cannot filter injected score for url " + url
- + ", using default (" + e.getMessage() + ")");
- }
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Cannot filter injected score for url " + url
+ + ", using default (" + e.getMessage() + ")");
+ }
}
-
+ context.getCounter("injector", "urls_injected").increment(1);
row.putToMarkers(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0)));
-
Mark.INJECT_MARK.putMark(row, YES_STRING);
context.write(reversedUrl, row);
}
+ }
}
-
+
public InjectorJob() {
-
}
public InjectorJob(Configuration conf) {
@@ -211,21 +217,40 @@ public class InjectorJob extends NutchTo
currentJob.setMapOutputKeyClass(String.class);
currentJob.setMapOutputValueClass(WebPage.class);
currentJob.setOutputFormatClass(GoraOutputFormat.class);
+
DataStore<String, WebPage> store = StorageUtils.createWebStore(currentJob.getConfiguration(),
- String.class, WebPage.class);
+ String.class, WebPage.class);
GoraOutputFormat.setOutput(currentJob, store, true);
+
+ // NUTCH-1471 Make explicit which datastore class we use
+ Class<? extends DataStore<Object, Persistent>> dataStoreClass =
+ StorageUtils.getDataStoreClass(currentJob.getConfiguration());
+ LOG.info("InjectorJob: Using " + dataStoreClass + " as the Gora storage class.");
+
currentJob.setReducerClass(Reducer.class);
currentJob.setNumReduceTasks(0);
+
currentJob.waitForCompletion(true);
ToolUtil.recordJobStatus(null, currentJob, results);
+
+ // NUTCH-1370 Make explicit #URLs injected @runtime
+ long urlsInjected = currentJob.getCounters().findCounter("injector", "urls_injected").getValue();
+ long urlsFiltered = currentJob.getCounters().findCounter("injector", "urls_filtered").getValue();
+ LOG.info("InjectorJob: total number of urls rejected by filters: " + urlsFiltered);
+ LOG.info("InjectorJob: total number of urls injected after normalization and filtering: "
+ + urlsInjected);
+
return results;
}
public void inject(Path urlDir) throws Exception {
- LOG.info("InjectorJob: starting");
- LOG.info("InjectorJob: urlDir: " + urlDir);
-
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ long start = System.currentTimeMillis();
+ LOG.info("InjectorJob: starting at " + sdf.format(start));
+ LOG.info("InjectorJob: Injecting urlDir: " + urlDir);
run(ToolUtil.toArgMap(Nutch.ARG_SEEDDIR, urlDir));
+ long end = System.currentTimeMillis();
+ LOG.info("Injector: finished at " + sdf.format(end) + ", elapsed: " + TimingUtil.elapsedTime(start, end));
}
@Override
@@ -246,7 +271,6 @@ public class InjectorJob extends NutchTo
try {
inject(new Path(args[0]));
- LOG.info("InjectorJob: finished");
return -0;
} catch (Exception e) {
LOG.error("InjectorJob: " + StringUtils.stringifyException(e));
Modified: nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java
URL: http://svn.apache.org/viewvc/nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java?rev=1412566&r1=1412565&r2=1412566&view=diff
==============================================================================
--- nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java (original)
+++ nutch/branches/2.x/src/java/org/apache/nutch/storage/StorageUtils.java Thu Nov 22 14:45:07 2012
@@ -75,9 +75,16 @@ public class StorageUtils {
return DataStoreFactory.createDataStore(dataStoreClass,
keyClass, persistentClass, conf, schema);
}
-
+
+ /**
+ * Return the Persistent Gora class used to persist Nutch Web data.
+ *
+ * @param the Nutch configuration
+ * @return the Gora DataStore persistent class
+ * @throws ClassNotFoundException
+ */
@SuppressWarnings("unchecked")
- private static <K, V extends Persistent> Class<? extends DataStore<K, V>>
+ public static <K, V extends Persistent> Class<? extends DataStore<K, V>>
getDataStoreClass(Configuration conf) throws ClassNotFoundException {
return (Class<? extends DataStore<K, V>>)
Class.forName(conf.get("storage.data.store.class",