You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by sn...@apache.org on 2017/12/18 15:49:46 UTC

[nutch] 02/23: NUTCH-2474 CrawlDbReader -stats fails with ClassCastException - replace CrawlDbStatCombiner by CrawlDbStatReducer and ensure that data is properly processed independently whether and how often combiner is called - simplify calculation of minimum and maximum

This is an automated email from the ASF dual-hosted git repository.

snagel pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nutch.git

commit d758a31bbee0807bcbc92a591668076cfa95aeb1
Author: Sebastian Nagel <sn...@apache.org>
AuthorDate: Fri Dec 8 22:41:05 2017 +0100

    NUTCH-2474 CrawlDbReader -stats fails with ClassCastException
    - replace CrawlDbStatCombiner by CrawlDbStatReducer and ensure
      that data is properly processed independently whether and
      how often combiner is called
    - simplify calculation of minimum and maximum
---
 src/java/org/apache/nutch/crawl/CrawlDbReader.java | 273 +++++++++------------
 1 file changed, 114 insertions(+), 159 deletions(-)

diff --git a/src/java/org/apache/nutch/crawl/CrawlDbReader.java b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
index 42b5a3b..117aa7f 100644
--- a/src/java/org/apache/nutch/crawl/CrawlDbReader.java
+++ b/src/java/org/apache/nutch/crawl/CrawlDbReader.java
@@ -202,14 +202,24 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
       output.collect(new Text("status " + value.getStatus()), COUNT_1);
       output.collect(new Text("retry " + value.getRetriesSinceFetch()),
           COUNT_1);
-      output.collect(new Text("sc"), new NutchWritable(
-          new FloatWritable(value.getScore())));
+
+      NutchWritable score = new NutchWritable(
+          new FloatWritable(value.getScore()));
+      output.collect(new Text("sc"), score);
+      output.collect(new Text("sct"), score);
+      output.collect(new Text("scd"), score);
+
       // fetch time (in minutes to prevent from overflows when summing up)
-      output.collect(new Text("ft"), new NutchWritable(
-          new LongWritable(value.getFetchTime() / (1000 * 60))));
+      NutchWritable fetchTime = new NutchWritable(
+          new LongWritable(value.getFetchTime() / (1000 * 60)));
+      output.collect(new Text("ft"), fetchTime);
+      output.collect(new Text("ftt"), fetchTime);
+
       // fetch interval (in seconds)
-      output.collect(new Text("fi"),
-          new NutchWritable(new LongWritable(value.getFetchInterval())));
+      NutchWritable fetchInterval = new NutchWritable(new LongWritable(value.getFetchInterval()));
+      output.collect(new Text("fi"), fetchInterval);
+      output.collect(new Text("fit"), fetchInterval);
+
       if (sort) {
         URL u = new URL(key.toString());
         String host = u.getHost();
@@ -219,88 +229,6 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
     }
   }
 
-  public static class CrawlDbStatCombiner implements
-      Reducer<Text, NutchWritable, Text, NutchWritable> {
-    LongWritable val = new LongWritable();
-
-    public CrawlDbStatCombiner() {
-    }
-
-    public void configure(JobConf job) {
-    }
-
-    public void close() {
-    }
-
-    private void reduceMinMaxTotal(String keyPrefix, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      long total = 0;
-      long min = Long.MAX_VALUE;
-      long max = Long.MIN_VALUE;
-      while (values.hasNext()) {
-        long cnt = ((LongWritable) values.next().get()).get();
-        if (cnt < min)
-          min = cnt;
-        if (cnt > max)
-          max = cnt;
-        total += cnt;
-      }
-      output.collect(new Text(keyPrefix + "n"),
-          new NutchWritable(new LongWritable(min)));
-      output.collect(new Text(keyPrefix + "x"),
-          new NutchWritable(new LongWritable(max)));
-      output.collect(new Text(keyPrefix + "t"),
-          new NutchWritable(new LongWritable(total)));
-    }
-    
-    private void reduceMinMaxTotalFloat(String keyPrefix, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      double total = 0;
-      float min = Float.MAX_VALUE;
-      float max = Float.MIN_VALUE;
-      TDigest tdigest = TDigest.createMergingDigest(100.0);
-      while (values.hasNext()) {
-        float val = ((FloatWritable) values.next().get()).get();
-        tdigest.add(val);
-        if (val < min)
-          min = val;
-        if (val > max)
-          max = val;
-        total += val;
-      }
-      output.collect(new Text(keyPrefix + "n"),
-          new NutchWritable(new FloatWritable(min)));
-      output.collect(new Text(keyPrefix + "x"),
-          new NutchWritable(new FloatWritable(max)));
-      output.collect(new Text(keyPrefix + "t"),
-          new NutchWritable(new FloatWritable((float) total)));
-      ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
-      tdigest.asSmallBytes(tdigestBytes);
-      output.collect(new Text(keyPrefix + "d"),
-          new NutchWritable(new BytesWritable(tdigestBytes.array())));
-    }
-
-    public void reduce(Text key, Iterator<NutchWritable> values,
-        OutputCollector<Text, NutchWritable> output, Reporter reporter)
-        throws IOException {
-      val.set(0L);
-      String k = key.toString();
-      if (k.equals("sc")) {
-        reduceMinMaxTotalFloat(k, values, output, reporter);
-      } else if (k.equals("ft") || k.equals("fi")) {
-        reduceMinMaxTotal(k, values, output, reporter);
-      } else {
-        while (values.hasNext()) {
-          LongWritable cnt = (LongWritable) values.next().get();
-          val.set(val.get() + cnt.get());
-        }
-        output.collect(key, new NutchWritable(val));
-      }
-    }
-  }
-
   public static class CrawlDbStatReducer implements
       Reducer<Text, NutchWritable, Text, NutchWritable> {
     public void configure(JobConf job) {
@@ -314,7 +242,8 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
         throws IOException {
 
       String k = key.toString();
-      if (k.equals("T")) {
+      if (k.equals("T") || k.startsWith("status") || k.startsWith("retry")
+          || k.equals("ftt") || k.equals("fit")) {
         // sum all values for this key
         long sum = 0;
         while (values.hasNext()) {
@@ -323,68 +252,59 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
         }
         // output sum
         output.collect(key, new NutchWritable(new LongWritable(sum)));
-      } else if (k.startsWith("status") || k.startsWith("retry")) {
-        LongWritable cnt = new LongWritable();
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("scx")) {
-        FloatWritable max = new FloatWritable(Float.MIN_VALUE);
-        while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          if (max.get() < val.get())
-            max.set(val.get());
-        }
-        output.collect(key, new NutchWritable(max));
-      } else if (k.equals("ftx") || k.equals("fix")) {
-        LongWritable cnt = new LongWritable(Long.MIN_VALUE);
-        while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          if (cnt.get() < val.get())
-            cnt.set(val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("scn")) {
-        FloatWritable min = new FloatWritable(Float.MAX_VALUE);
+      } else if (k.equals("sc")) {
+        float min = Float.MAX_VALUE;
+        float max = Float.MIN_VALUE;
         while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          if (min.get() > val.get())
-            min.set(val.get());
+          float value = ((FloatWritable) values.next().get()).get();
+          if (max < value) {
+            max = value;
+          }
+          if (min > value) {
+            min = value;
+          }
         }
-        output.collect(key, new NutchWritable(min));
-      } else if (k.equals("ftn") || k.equals("fin")) {
-        LongWritable cnt = new LongWritable(Long.MAX_VALUE);
+        output.collect(key, new NutchWritable(new FloatWritable(min)));
+        output.collect(key, new NutchWritable(new FloatWritable(max)));
+      } else if (k.equals("ft") || k.equals("fi")) {
+        long min = Long.MAX_VALUE;
+        long max = Long.MIN_VALUE;
         while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          if (cnt.get() > val.get())
-            cnt.set(val.get());
+          long value = ((LongWritable) values.next().get()).get();
+          if (max < value) {
+            max = value;
+          }
+          if (min > value) {
+            min = value;
+          }
         }
-        output.collect(key, new NutchWritable(cnt));
+        output.collect(key, new NutchWritable(new LongWritable(min)));
+        output.collect(key, new NutchWritable(new LongWritable(max)));
       } else if (k.equals("sct")) {
-        FloatWritable cnt = new FloatWritable();
-        while (values.hasNext()) {
-          FloatWritable val = (FloatWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
-        }
-        output.collect(key, new NutchWritable(cnt));
-      } else if (k.equals("ftt") || k.equals("fit")) {
-        LongWritable cnt = new LongWritable();
+        float cnt = 0.0f;
         while (values.hasNext()) {
-          LongWritable val = (LongWritable) values.next().get();
-          cnt.set(cnt.get() + val.get());
+          float value = ((FloatWritable) values.next().get()).get();
+          cnt += value;
         }
-        output.collect(key, new NutchWritable(cnt));
+        output.collect(key, new NutchWritable(new FloatWritable(cnt)));
       } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
         MergingDigest tdigest = null;
         while (values.hasNext()) {
-          byte[] bytes = ((BytesWritable) values.next().get()).getBytes();
-          MergingDigest tdig = MergingDigest.fromBytes(ByteBuffer.wrap(bytes));
-          if (tdigest == null) {
-            tdigest = tdig;
-          } else {
-            tdigest.add(tdig);
+          Writable value = values.next().get();
+          if (value instanceof BytesWritable) {
+            byte[] bytes = ((BytesWritable) value).getBytes();
+            MergingDigest tdig = MergingDigest
+                .fromBytes(ByteBuffer.wrap(bytes));
+            if (tdigest == null) {
+              tdigest = tdig;
+            } else {
+              tdigest.add(tdig);
+            }
+          } else if (value instanceof FloatWritable) {
+            if (tdigest == null) {
+              tdigest = (MergingDigest) TDigest.createMergingDigest(100.0);
+            }
+            tdigest.add(((FloatWritable) value).get());
           }
         }
         ByteBuffer tdigestBytes = ByteBuffer.allocate(tdigest.smallByteSize());
@@ -455,7 +375,7 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
 	  job.setInputFormat(SequenceFileInputFormat.class);
 
 	  job.setMapperClass(CrawlDbStatMapper.class);
-	  job.setCombinerClass(CrawlDbStatCombiner.class);
+	  job.setCombinerClass(CrawlDbStatReducer.class);
 	  job.setReducerClass(CrawlDbStatReducer.class);
 
 	  FileOutputFormat.setOutputPath(job, tmpFolder);
@@ -486,27 +406,57 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
 			    stats.put(k, value.get());
 			    continue;
 			  }
-			  if (k.equals("scx")) {
-			    FloatWritable fvalue = (FloatWritable) value.get();
-			    if (((FloatWritable) val).get() < fvalue.get())
-			      ((FloatWritable) val).set(fvalue.get());
-        } else if (k.equals("ftx") || k.equals("fix")) {
-          LongWritable lvalue = (LongWritable) value.get();
-          if (((LongWritable) val).get() < lvalue.get())
-            ((LongWritable) val).set(lvalue.get());
-        } else if (k.equals("scn")) {
-          FloatWritable fvalue = (FloatWritable) value.get();
-          if (((FloatWritable) val).get() > fvalue.get())
-            ((FloatWritable) val).set(fvalue.get());
-			  } else if (k.equals("ftn") || k.equals("fin")) {
-          LongWritable lvalue = (LongWritable) value.get();
-				  if (((LongWritable) val).get() > lvalue.get())
-				    ((LongWritable) val).set(lvalue.get());
+			  if (k.equals("sc")) {
+			    float min = Float.MAX_VALUE;
+          float max = Float.MIN_VALUE;
+			    if (stats.containsKey("scn")) {
+			      min = ((FloatWritable) stats.get("scn")).get();
+			    } else {
+			      min = ((FloatWritable) stats.get("sc")).get();
+			    }
+          if (stats.containsKey("scx")) {
+            max = ((FloatWritable) stats.get("scx")).get();
+          } else {
+            max = ((FloatWritable) stats.get("sc")).get();
+          }
+			    float fvalue = ((FloatWritable) value.get()).get();
+			    if (min > fvalue) {
+			      min = fvalue;
+			    }
+          if (max < fvalue) {
+            max = fvalue;
+          }
+          stats.put("scn", new FloatWritable(min));
+          stats.put("scx", new FloatWritable(max));
+        } else if (k.equals("ft") || k.equals("fi")) {
+          long min = Long.MAX_VALUE;
+          long max = Long.MIN_VALUE;
+          String minKey = k + "n";
+          String maxKey = k + "x";
+          if (stats.containsKey(minKey)) {
+            min = ((LongWritable) stats.get(minKey)).get();
+          } else if (stats.containsKey(k)) {
+            min = ((LongWritable) stats.get(k)).get();
+          }
+          if (stats.containsKey(maxKey)) {
+            max = ((LongWritable) stats.get(maxKey)).get();
+          } else if (stats.containsKey(k)) {
+            max = ((LongWritable) stats.get(k)).get();
+          }
+          long lvalue = ((LongWritable) value.get()).get();
+          if (min > lvalue) {
+            min = lvalue;
+          }
+          if (max < lvalue) {
+            max = lvalue;
+          }
+          stats.put(k + "n", new LongWritable(min));
+          stats.put(k + "x", new LongWritable(max));
 			  } else if (k.equals("sct")) {
           FloatWritable fvalue = (FloatWritable) value.get();
           ((FloatWritable) val)
               .set(((FloatWritable) val).get() + fvalue.get());
-        } else if (k.equals("scd") || k.equals("ftd") || k.equals("fid")) {
+        } else if (k.equals("scd")) {
           MergingDigest tdigest = null;
           MergingDigest tdig = MergingDigest.fromBytes(
               ByteBuffer.wrap(((BytesWritable) value.get()).getBytes()));
@@ -529,6 +479,11 @@ public class CrawlDbReader extends Configured implements Closeable, Tool {
 		  }
 		  reader.close();
 	  }
+    // remove score, fetch interval, and fetch time
+    // (used for min/max calculation)
+    stats.remove("sc");
+    stats.remove("fi");
+    stats.remove("ft");
 	  // removing the tmp folder
 	  fileSystem.delete(tmpFolder, true);
 	  return stats;

-- 
To stop receiving notification emails like this one, please contact
"commits@nutch.apache.org" <co...@nutch.apache.org>.