You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2015/08/24 21:18:10 UTC
[1/2] incubator-datafu git commit: DATAFU-91: pig version of
HyperLogLog estimator should be Algebraic and use combiners
Repository: incubator-datafu
Updated Branches:
refs/heads/master b921ca57a -> 643543706
DATAFU-91: pig version of HyperLogLog estimator should be Algebraic and use combiners
Signed-off-by: Matthew Hayes <ma...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/7ed3902e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/7ed3902e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/7ed3902e
Branch: refs/heads/master
Commit: 7ed3902e7c0c081d91dd2f31e8c5a891ca375130
Parents: b921ca5
Author: Ido Hadanny <id...@gmail.com>
Authored: Mon Aug 24 11:56:54 2015 -0700
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Mon Aug 24 11:56:54 2015 -0700
----------------------------------------------------------------------
.../datafu/pig/stats/HyperLogLogPlusPlus.java | 174 +++++++++++++++----
1 file changed, 144 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/7ed3902e/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 95c5b0e..28927a5 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -20,14 +20,28 @@
package datafu.pig.stats;
import java.io.IOException;
+import java.util.Iterator;
-import org.apache.pig.AccumulatorEvalFunc;
+import org.apache.pig.AlgebraicEvalFunc;
+import org.apache.pig.EvalFunc;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
+import com.clearspring.analytics.hash.MurmurHash;
+import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
+import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
+
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Final;
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Initial;
+import datafu.pig.stats.entropy.EmpiricalCountEntropy.Intermediate;
+
/**
* A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
*
@@ -42,11 +56,11 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
* </p>
*
*/
-public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
+public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
{
- private com.clearspring.analytics.stream.cardinality.HyperLogLogPlus estimator;
-
- private final int p;
+ private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+ private String p;
/**
* Constructs a HyperLogLog++ estimator.
@@ -59,36 +73,15 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
/**
* Constructs a HyperLogLog++ estimator.
*
- * @param p precision value
+ * @param par precision value
*/
public HyperLogLogPlusPlus(String p)
{
- this.p = Integer.parseInt(p);
+ super(p);
+ this.p = p;
cleanup();
}
-
- @Override
- public void accumulate(Tuple arg0) throws IOException
- {
- DataBag inputBag = (DataBag)arg0.get(0);
- for (Tuple t : inputBag)
- {
- estimator.offer(t);
- }
- }
-
- @Override
- public void cleanup()
- {
- this.estimator = new com.clearspring.analytics.stream.cardinality.HyperLogLogPlus(p);
- }
-
- @Override
- public Long getValue()
- {
- return this.estimator.cardinality();
- }
-
+
@Override
public Schema outputSchema(Schema input)
{
@@ -111,4 +104,125 @@ public class HyperLogLogPlusPlus extends AccumulatorEvalFunc<Long>
throw new RuntimeException(e);
}
}
+
+ private String param = null;
+ private String getParam()
+ {
+ if (param == null) {
+ if (this.p != null) {
+ param = String.format("('%s')", this.p);
+ } else {
+ param = "";
+ }
+ }
+ return param;
+ }
+
+ @Override
+ public String getFinal() {
+ return Final.class.getName() + getParam();
+ }
+
+ @Override
+ public String getInitial() {
+ return Initial.class.getName() + getParam();
+ }
+
+ @Override
+ public String getIntermed() {
+ return Intermediate.class.getName() + getParam();
+ }
+
+ static public class Initial extends EvalFunc<Tuple> {
+ public Initial() {};
+ public Initial(String p) {};
+
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ // Since Initial is guaranteed to be called
+ // only in the map, it will be called with an
+ // input of a bag with a single tuple - the
+ // count should always be 1 if bag is non empty
+ DataBag bag = (DataBag) input.get(0);
+ Iterator<Tuple> it = bag.iterator();
+ if (it.hasNext()) {
+ Tuple t = (Tuple) it.next();
+ if (t != null && t.size() > 0 && t.get(0) != null) {
+ long x = MurmurHash.hash64(t);
+ return mTupleFactory.newTuple((Object) x);
+ }
+ }
+ return mTupleFactory.newTuple((Object) MurmurHash.hash64(null));
+ }
+ }
+
+ static public class Intermediate extends EvalFunc<Tuple> {
+ public Intermediate() {
+ this("20");
+ };
+ private String p;
+ public Intermediate(String p) {this.p = p;};
+
+ @Override
+ public Tuple exec(Tuple input) throws IOException {
+ try {
+ DataByteArray data = new DataByteArray(countDisctinct(input, Integer.parseInt(p)).getBytes());
+ return mTupleFactory.newTuple(data);
+ } catch (ExecException ee) {
+ throw ee;
+ } catch (Exception e) {
+ int errCode = 2106;
+ String msg = "Error while computing count in "
+ + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ }
+
+ static public class Final extends EvalFunc<Long> {
+ public Final() {
+ this("20");
+ };
+ private String p;
+ public Final(String p) {this.p = p;};
+
+ @Override
+ public Long exec(Tuple input) throws IOException {
+ try {
+ return countDisctinct(input, Integer.parseInt(p)).cardinality();
+ } catch (Exception ee) {
+ int errCode = 2106;
+ String msg = "Error while computing count in "
+ + this.getClass().getSimpleName();
+ throw new ExecException(msg, errCode, PigException.BUG, ee);
+ }
+ }
+ }
+
+ static protected HyperLogLogPlus countDisctinct(Tuple input, int p)
+ throws NumberFormatException, IOException {
+ HyperLogLogPlus estimator = new HyperLogLogPlus(p);
+ DataBag values = (DataBag) input.get(0);
+ for (Iterator<Tuple> it = values.iterator(); it.hasNext();) {
+ Tuple t = it.next();
+ Object data = t.get(0);
+ if (data instanceof Long) {
+ estimator.offer(data);
+ } else if (data instanceof DataByteArray) {
+ DataByteArray bytes = (DataByteArray) data;
+ HyperLogLogPlus newEstimator;
+ try {
+ newEstimator = HyperLogLogPlus.Builder.build(bytes.get());
+ estimator = (HyperLogLogPlus) estimator.merge(newEstimator);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ } catch (CardinalityMergeException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return estimator;
+ }
+
}
[2/2] incubator-datafu git commit: Address minor review feedback for
HyperLogLogPlusPlus
Posted by mh...@apache.org.
Address minor review feedback for HyperLogLogPlusPlus
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/64354370
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/64354370
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/64354370
Branch: refs/heads/master
Commit: 643543706e6ad63c9c997f94bfdc979c63506339
Parents: 7ed3902
Author: Matthew Hayes <ma...@gmail.com>
Authored: Mon Aug 24 12:07:02 2015 -0700
Committer: Matthew Hayes <ma...@gmail.com>
Committed: Mon Aug 24 12:07:02 2015 -0700
----------------------------------------------------------------------
.../java/datafu/pig/stats/HyperLogLogPlusPlus.java | 15 ++++-----------
1 file changed, 4 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/64354370/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
----------------------------------------------------------------------
diff --git a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
index 28927a5..2068801 100644
--- a/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
+++ b/datafu-pig/src/main/java/datafu/pig/stats/HyperLogLogPlusPlus.java
@@ -38,10 +38,6 @@ import com.clearspring.analytics.hash.MurmurHash;
import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Final;
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Initial;
-import datafu.pig.stats.entropy.EmpiricalCountEntropy.Intermediate;
-
/**
* A UDF that applies the HyperLogLog++ cardinality estimation algorithm.
*
@@ -146,14 +142,11 @@ public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
// count should always be 1 if bag is non empty
DataBag bag = (DataBag) input.get(0);
Iterator<Tuple> it = bag.iterator();
+ Tuple t = null;
if (it.hasNext()) {
- Tuple t = (Tuple) it.next();
- if (t != null && t.size() > 0 && t.get(0) != null) {
- long x = MurmurHash.hash64(t);
- return mTupleFactory.newTuple((Object) x);
- }
+ t = (Tuple) it.next();
}
- return mTupleFactory.newTuple((Object) MurmurHash.hash64(null));
+ return mTupleFactory.newTuple((Object) MurmurHash.hash64(t));
}
}
@@ -208,7 +201,7 @@ public class HyperLogLogPlusPlus extends AlgebraicEvalFunc<Long>
Tuple t = it.next();
Object data = t.get(0);
if (data instanceof Long) {
- estimator.offer(data);
+ estimator.offerHashed((Long)data);
} else if (data instanceof DataByteArray) {
DataByteArray bytes = (DataByteArray) data;
HyperLogLogPlus newEstimator;