You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2015/08/24 21:18:10 UTC

[1/2] incubator-datafu git commit: DATAFU-91: pig version of HyperLogLog estimator should be Algebraic and use combiners

Repository: incubator-datafu
Updated Branches:
  refs/heads/master b921ca57a -> 643543706


DATAFU-91: pig version of HyperLogLog estimator should be Algebraic and use combiners

Signed-off-by: Matthew Hayes <ma...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/7ed3902e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/7ed3902e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/7ed3902e

Branch: refs/heads/master
Commit: 7ed3902e7c0c081d91dd2f31e8c5a891ca375130
Parents: b921ca5
Author: Ido Hadanny <id...@gmail.com>
Authored: Mon Aug 24 11:56:54 2015 -0700
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Mon Aug 24 11:56:54 2015 -0700

----------------------------------------------------------------------
 .../datafu/pig/stats/HyperLogLogPlusPlus.java   | 174 +++++++++++++++----
 1 file changed, 144 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/7ed3902e/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 95c5b0e..28927a5 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -20,14 +20,28 @@
 package datafu.pig.stats;
 
 import java.io.IOException;
+import java.util.Iterator;
 
-import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Final;
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Initial;
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Intermediate;
+
 /**
  * A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
  * 
@@ -42,11 +56,11 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
  * </p>
  * 
  */
-public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
+public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
 {
-  private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus estimator;
-  
-  private final int p;
+  private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+  private String p;
   
   /**
    * Constructs a HyperLogLog++ estimator.
@@ -59,36 +73,15 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
   /**
    * Constructs a HyperLogLog++ estimator.
    * 
-   * @param p precision value
+   * @param par precision value
    */
   public HyperLogLogPlusPlus(String p)
   {
-    this.p = Integer.parseInt(p);
+    super(p);
+    this.p = p;
     cleanup();
   }
-  
-  @Override
-  public void accumulate(Tuple arg0) throws IOException
-  {
-    DataBag inputBag = (DataBag)arg0.get(0);
-    for (Tuple t : inputBag) 
-    {
-      estimator.offer(t);
-    }
-  }
-
-  @Override
-  public void cleanup()
-  {
-    this.estimator = new com.clearspring.analytics.stream.cardinality.HyperLogLogPlus(p);
-  }
-
-  @Override
-  public Long getValue()
-  {
-    return this.estimator.cardinality();
-  }
-  
+    
   @Override
   public Schema outputSchema(Schema input)
   {
@@ -111,4 +104,125 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
       throw new RuntimeException(e);
     }
   }
+  
+  private String param = null;
+  private String getParam()
+  {
+    if (param == null) {
+      if (this.p != null) {
+        param = String.format("('%s')", this.p);
+      } else {
+        param = "";
+      }
+    }
+    return param;
+  }
+  
+  @Override
+  public String getFinal() {
+      return Final.class.getName() + getParam();
+  }
+
+  @Override
+  public String getInitial() {
+     return Initial.class.getName() + getParam();
+  }
+
+  @Override
+  public String getIntermed() {
+      return Intermediate.class.getName() + getParam();
+  }
+
+  static public class Initial extends EvalFunc<Tuple> {
+	public Initial() {};
+	public Initial(String p) {};
+
+	  
+    @Override
+    public Tuple exec(Tuple input) throws IOException {
+      // Since Initial is guaranteed to be called
+      // only in the map, it will be called with an
+      // input of a bag with a single tuple - the
+      // count should always be 1 if bag is non empty
+      DataBag bag = (DataBag) input.get(0);
+      Iterator<Tuple> it = bag.iterator();
+      if (it.hasNext()) {
+        Tuple t = (Tuple) it.next();
+        if (t != null && t.size() > 0 && t.get(0) != null) {
+          long x = MurmurHash.hash64(t);
+          return mTupleFactory.newTuple((Object) x);
+        }
+      }
+      return mTupleFactory.newTuple((Object) MurmurHash.hash64(null));
+    }
+  }
+
+  static public class Intermediate extends EvalFunc<Tuple> {
+	public Intermediate() {
+		this("20");
+	};
+	private String p;
+	public Intermediate(String p) {this.p = p;};
+	  
+    @Override
+    public Tuple exec(Tuple input) throws IOException {
+      try {
+        DataByteArray data = new DataByteArray(countDisctinct(input, Integer.parseInt(p)).getBytes());
+        return mTupleFactory.newTuple(data);
+      } catch (ExecException ee) {
+        throw ee;
+      } catch (Exception e) {
+        int errCode = 2106;
+        String msg = "Error while computing count in "
+            + this.getClass().getSimpleName();
+        throw new ExecException(msg, errCode, PigException.BUG, e);
+      }
+    }
+  }
+
+  static public class Final extends EvalFunc<Long> {
+	public Final() {
+		this("20");
+	};
+	private String p;
+	public Final(String p) {this.p = p;};
+	  
+    @Override
+    public Long exec(Tuple input) throws IOException {
+      try {
+        return countDisctinct(input, Integer.parseInt(p)).cardinality();
+      } catch (Exception ee) {
+        int errCode = 2106;
+        String msg = "Error while computing count in "
+            + this.getClass().getSimpleName();
+        throw new ExecException(msg, errCode, PigException.BUG, ee);
+      }
+    }
+  }
+
+  static protected HyperLogLogPlus countDisctinct(Tuple input, int p)
+      throws NumberFormatException, IOException {
+    HyperLogLogPlus estimator = new HyperLogLogPlus(p);
+    DataBag values = (DataBag) input.get(0);
+    for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+      Tuple t = it.next();
+      Object data = t.get(0);
+      if (data instanceof Long) {
+        estimator.offer(data);
+      } else if (data instanceof DataByteArray) {
+        DataByteArray bytes = (DataByteArray) data;
+        HyperLogLogPlus newEstimator;
+        try {
+          newEstimator = HyperLogLogPlus.Builder.build(bytes.get());
+          estimator = (HyperLogLogPlus) estimator.merge(newEstimator);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        } catch (CardinalityMergeException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    return estimator;
+  }
+  
 }


[2/2] incubator-datafu git commit: Address minor review feedback for HyperLogLogPlusPlus

Posted by mh...@apache.org.
Address minor review feedback for HyperLogLogPlusPlus


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/64354370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/64354370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/64354370

Branch: refs/heads/master
Commit: 643543706e6ad63c9c997f94bfdc979c63506339
Parents: 7ed3902
Author: Matthew Hayes <ma...@gmail.com>
Authored: Mon Aug 24 12:07:02 2015 -0700
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Mon Aug 24 12:07:02 2015 -0700

----------------------------------------------------------------------
 .../java/datafu/pig/stats/HyperLogLogPlusPlus.java   | 15 ++++-----------
 1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/64354370/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 28927a5..2068801 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -38,10 +38,6 @@ import com.clearspring.analytics.hash.MurmurHash;
 import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
 
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Final;
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Initial;
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Intermediate;
-
 /**
  * A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
  * 
@@ -146,14 +142,11 @@ public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
       // count should always be 1 if bag is non empty
       DataBag bag = (DataBag) input.get(0);
       Iterator<Tuple> it = bag.iterator();
+      Tuple t = null;
       if (it.hasNext()) {
-        Tuple t = (Tuple) it.next();
-        if (t != null && t.size() > 0 && t.get(0) != null) {
-          long x = MurmurHash.hash64(t);
-          return mTupleFactory.newTuple((Object) x);
-        }
+        t = (Tuple) it.next();
       }
-      return mTupleFactory.newTuple((Object) MurmurHash.hash64(null));
+      return mTupleFactory.newTuple((Object) MurmurHash.hash64(t));
     }
   }
 
@@ -208,7 +201,7 @@ public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
       Tuple t = it.next();
       Object data = t.get(0);
       if (data instanceof Long) {
-        estimator.offer(data);
+        estimator.offerHashed((Long)data);
       } else if (data instanceof DataByteArray) {
         DataByteArray bytes = (DataByteArray) data;
         HyperLogLogPlus newEstimator;