You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2017/12/18 15:49:46 UTC
[nutch] 02/23: NUTCH-2474 CrawlDbReader -stats fails with
ClassCastException - replace CrawlDbStatCombiner by CrawlDbStatReducer and
ensure that data is properly processed independently whether and how often
combiner is called - simplify calculation of minimum and maximum
This is an automated email from the ASF dual-hosted git repository.
snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git
commit d758a31bbee0807bcbc92a591668076cfa95aeb1
Author: Sebastian Nagel <sn...@apache.org>
AuthorDate: Fri Dec 8 22:41:05 2017 +0100
NUTCH-2474 CrawlDbReader -stats fails with ClassCastException
- replace CrawlDbStatCombiner by CrawlDbStatReducer and ensure
that data is properly processed independently whether and
how often combiner is called
- simplify calculation of minimum and maximum
---
src/java/org/apache/nutch/crawl/CrawlDbReader.java | 273 +++++++++------------
1 file changed, 114 insertions(+), 159 deletions(-)
diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index 42b5a3b..117aa7f 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -202,14 +202,24 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
output.collect(new Text("status " + value.getStatus()), COUNT_1);
output.collect(new Text("retry " + value.getRetriesSinceFetch()),
COUNT_1);
- output.collect(new Text("sc"), new NutchWritable(
- new FloatWritable(value.getScore())));
+
+ NutchWritable score = new NutchWritable(
+ new FloatWritable(value.getScore()));
+ output.collect(new Text("sc"), score);
+ output.collect(new Text("sct"), score);
+ output.collect(new Text("scd"), score);
+
// fetch time (in minutes to prevent from overflows when summing up)
- output.collect(new Text("ft"), new NutchWritable(
- new LongWritable(value.getFetchTime() / (1000 * 60))));
+ NutchWritable fetchTime = new NutchWritable(
+ new LongWritable(value.getFetchTime() / (1000 * 60)));
+ output.collect(new Text("ft"), fetchTime);
+ output.collect(new Text("ftt"), fetchTime);
+
// fetch interval (in seconds)
- output.collect(new Text("fi"),
- new NutchWritable(new LongWritable(value.getFetchInterval())));
+ NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval()));
+ output.collect(new Text("fi"), fetchInterval);
+ output.collect(new Text("fit"), fetchInterval);
+
if (sort) {
URL u = new URL(key.toString());
String host = u.getHost();
@@ -219,88 +229,6 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
}
}
- public static class CrawlDbStatCombiner implements
- Reducer<Text, NutchWritable, Text, NutchWritable> {
- LongWritable val = new LongWritable();
-
- public CrawlDbStatCombiner() {
- }
-
- public void configure(JobConf job) {
- }
-
- public void close() {
- }
-
- private void reduceMinMaxTotal(String keyPrefix, Iterator<NutchWritable> values,
- OutputCollector<Text, NutchWritable> output, Reporter reporter)
- throws IOException {
- long total = 0;
- long min = Long.MAX_VALUE;
- long max = Long.MIN_VALUE;
- while (values.hasNext()) {
- long cnt = ((LongWritable) values.next().get()).get();
- if (cnt < min)
- min = cnt;
- if (cnt > max)
- max = cnt;
- total += cnt;
- }
- output.collect(new Text(keyPrefix + "n"),
- new NutchWritable(new LongWritable(min)));
- output.collect(new Text(keyPrefix + "x"),
- new NutchWritable(new LongWritable(max)));
- output.collect(new Text(keyPrefix + "t"),
- new NutchWritable(new LongWritable(total)));
- }
-
- private void reduceMinMaxTotalFloat(String keyPrefix, Iterator<NutchWritable> values,
- OutputCollector<Text, NutchWritable> output, Reporter reporter)
- throws IOException {
- double total = 0;
- float min = Float.MAX_VALUE;
- float max = Float.MIN_VALUE;
- TDigest tdigest = TDigest.createMergingDigest(100.0);
- while (values.hasNext()) {
- float val = ((FloatWritable) values.next().get()).get();
- tdigest.add(val);
- if (val < min)
- min = val;
- if (val > max)
- max = val;
- total += val;
- }
- output.collect(new Text(keyPrefix + "n"),
- new NutchWritable(new FloatWritable(min)));
- output.collect(new Text(keyPrefix + "x"),
- new NutchWritable(new FloatWritable(max)));
- output.collect(new Text(keyPrefix + "t"),
- new NutchWritable(new FloatWritable((float) total)));
- ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
- tdigest.asSmallBytes(tdigestBytes);
- output.collect(new Text(keyPrefix + "d"),
- new NutchWritable(new BytesWritable(tdigestBytes.array())));
- }
-
- public void reduce(Text key, Iterator<NutchWritable> values,
- OutputCollector<Text, NutchWritable> output, Reporter reporter)
- throws IOException {
- val.set(0L);
- String k = key.toString();
- if (k.equals("sc")) {
- reduceMinMaxTotalFloat(k, values, output, reporter);
- } else if (k.equals("ft") || k.equals("fi")) {
- reduceMinMaxTotal(k, values, output, reporter);
- } else {
- while (values.hasNext()) {
- LongWritable cnt = (LongWritable) values.next().get();
- val.set(val.get() + cnt.get());
- }
- output.collect(key, new NutchWritable(val));
- }
- }
- }
-
public static class CrawlDbStatReducer implements
Reducer<Text, NutchWritable, Text, NutchWritable> {
public void configure(JobConf job) {
@@ -314,7 +242,8 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
throws IOException {
String k = key.toString();
- if (k.equals("T")) {
+ if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
+ || k.equals("ftt") || k.equals("fit")) {
// sum all values for this key
long sum = 0;
while (values.hasNext()) {
@@ -323,68 +252,59 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
}
// output sum
output.collect(key, new NutchWritable(new LongWritable(sum)));
- } else if (k.startsWith("status") || k.startsWith("retry")) {
- LongWritable cnt = new LongWritable();
- while (values.hasNext()) {
- LongWritable val = (LongWritable) values.next().get();
- cnt.set(cnt.get() + val.get());
- }
- output.collect(key, new NutchWritable(cnt));
- } else if (k.equals("scx")) {
- FloatWritable max = new FloatWritable(Float.MIN_VALUE);
- while (values.hasNext()) {
- FloatWritable val = (FloatWritable) values.next().get();
- if (max.get() < val.get())
- max.set(val.get());
- }
- output.collect(key, new NutchWritable(max));
- } else if (k.equals("ftx") || k.equals("fix")) {
- LongWritable cnt = new LongWritable(Long.MIN_VALUE);
- while (values.hasNext()) {
- LongWritable val = (LongWritable) values.next().get();
- if (cnt.get() < val.get())
- cnt.set(val.get());
- }
- output.collect(key, new NutchWritable(cnt));
- } else if (k.equals("scn")) {
- FloatWritable min = new FloatWritable(Float.MAX_VALUE);
+ } else if (k.equals("sc")) {
+ float min = Float.MAX_VALUE;
+ float max = Float.MIN_VALUE;
while (values.hasNext()) {
- FloatWritable val = (FloatWritable) values.next().get();
- if (min.get() > val.get())
- min.set(val.get());
+ float value = ((FloatWritable) values.next().get()).get();
+ if (max < value) {
+ max = value;
+ }
+ if (min > value) {
+ min = value;
+ }
}
- output.collect(key, new NutchWritable(min));
- } else if (k.equals("ftn") || k.equals("fin")) {
- LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+ output.collect(key, new NutchWritable(new FloatWritable(min)));
+ output.collect(key, new NutchWritable(new FloatWritable(max)));
+ } else if (k.equals("ft") || k.equals("fi")) {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
while (values.hasNext()) {
- LongWritable val = (LongWritable) values.next().get();
- if (cnt.get() > val.get())
- cnt.set(val.get());
+ long value = ((LongWritable) values.next().get()).get();
+ if (max < value) {
+ max = value;
+ }
+ if (min > value) {
+ min = value;
+ }
}
- output.collect(key, new NutchWritable(cnt));
+ output.collect(key, new NutchWritable(new LongWritable(min)));
+ output.collect(key, new NutchWritable(new LongWritable(max)));
} else if (k.equals("sct")) {
- FloatWritable cnt = new FloatWritable();
- while (values.hasNext()) {
- FloatWritable val = (FloatWritable) values.next().get();
- cnt.set(cnt.get() + val.get());
- }
- output.collect(key, new NutchWritable(cnt));
- } else if (k.equals("ftt") || k.equals("fit")) {
- LongWritable cnt = new LongWritable();
+ float cnt = 0.0f;
while (values.hasNext()) {
- LongWritable val = (LongWritable) values.next().get();
- cnt.set(cnt.get() + val.get());
+ float value = ((FloatWritable) values.next().get()).get();
+ cnt += value;
}
- output.collect(key, new NutchWritable(cnt));
+ output.collect(key, new NutchWritable(new FloatWritable(cnt)));
} else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
MergingDigest tdigest = null;
while (values.hasNext()) {
- byte[] bytes = ((BytesWritable) values.next().get()).getBytes();
- MergingDigest tdig = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
- if (tdigest == null) {
- tdigest = tdig;
- } else {
- tdigest.add(tdig);
+ Writable value = values.next().get();
+ if (value instanceof BytesWritable) {
+ byte[] bytes = ((BytesWritable) value).getBytes();
+ MergingDigest tdig = MergingDigest
+ .fromBytes(ByteBuffer.wrap(bytes));
+ if (tdigest == null) {
+ tdigest = tdig;
+ } else {
+ tdigest.add(tdig);
+ }
+ } else if (value instanceof FloatWritable) {
+ if (tdigest == null) {
+ tdigest = (MergingDigest) TDigest.createMergingDigest(100.0);
+ }
+ tdigest.add(((FloatWritable) value).get());
}
}
ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
@@ -455,7 +375,7 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
job.setInputFormat(SequenceFileInputFormat.class);
job.setMapperClass(CrawlDbStatMapper.class);
- job.setCombinerClass(CrawlDbStatCombiner.class);
+ job.setCombinerClass(CrawlDbStatReducer.class);
job.setReducerClass(CrawlDbStatReducer.class);
FileOutputFormat.setOutputPath(job, tmpFolder);
@@ -486,27 +406,57 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
stats.put(k, value.get());
continue;
}
- if (k.equals("scx")) {
- FloatWritable fvalue = (FloatWritable) value.get();
- if (((FloatWritable) val).get() < fvalue.get())
- ((FloatWritable) val).set(fvalue.get());
- } else if (k.equals("ftx") || k.equals("fix")) {
- LongWritable lvalue = (LongWritable) value.get();
- if (((LongWritable) val).get() < lvalue.get())
- ((LongWritable) val).set(lvalue.get());
- } else if (k.equals("scn")) {
- FloatWritable fvalue = (FloatWritable) value.get();
- if (((FloatWritable) val).get() > fvalue.get())
- ((FloatWritable) val).set(fvalue.get());
- } else if (k.equals("ftn") || k.equals("fin")) {
- LongWritable lvalue = (LongWritable) value.get();
- if (((LongWritable) val).get() > lvalue.get())
- ((LongWritable) val).set(lvalue.get());
+ if (k.equals("sc")) {
+ float min = Float.MAX_VALUE;
+ float max = Float.MIN_VALUE;
+ if (stats.containsKey("scn")) {
+ min = ((FloatWritable) stats.get("scn")).get();
+ } else {
+ min = ((FloatWritable) stats.get("sc")).get();
+ }
+ if (stats.containsKey("scx")) {
+ max = ((FloatWritable) stats.get("scx")).get();
+ } else {
+ max = ((FloatWritable) stats.get("sc")).get();
+ }
+ float fvalue = ((FloatWritable) value.get()).get();
+ if (min > fvalue) {
+ min = fvalue;
+ }
+ if (max < fvalue) {
+ max = fvalue;
+ }
+ stats.put("scn", new FloatWritable(min));
+ stats.put("scx", new FloatWritable(max));
+ } else if (k.equals("ft") || k.equals("fi")) {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ String minKey = k + "n";
+ String maxKey = k + "x";
+ if (stats.containsKey(minKey)) {
+ min = ((LongWritable) stats.get(minKey)).get();
+ } else if (stats.containsKey(k)) {
+ min = ((LongWritable) stats.get(k)).get();
+ }
+ if (stats.containsKey(maxKey)) {
+ max = ((LongWritable) stats.get(maxKey)).get();
+ } else if (stats.containsKey(k)) {
+ max = ((LongWritable) stats.get(k)).get();
+ }
+ long lvalue = ((LongWritable) value.get()).get();
+ if (min > lvalue) {
+ min = lvalue;
+ }
+ if (max < lvalue) {
+ max = lvalue;
+ }
+ stats.put(k + "n", new LongWritable(min));
+ stats.put(k + "x", new LongWritable(max));
} else if (k.equals("sct")) {
FloatWritable fvalue = (FloatWritable) value.get();
((FloatWritable) val)
.set(((FloatWritable) val).get() + fvalue.get());
- } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
+ } else if (k.equals("scd")) {
MergingDigest tdigest = null;
MergingDigest tdig = MergingDigest.fromBytes(
ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
@@ -529,6 +479,11 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
}
reader.close();
}
+ // remove score, fetch interval, and fetch time
+ // (used for min/max calculation)
+ stats.remove("sc");
+ stats.remove("fi");
+ stats.remove("ft");
// removing the tmp folder
fileSystem.delete(tmpFolder, true);
return stats;
--
To stop receiving notification emails like this one, please contact
"commits@nutch.apache.org" <co...@nutch.apache.org>.