You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/09/03 00:47:59 UTC

git commit: [SPARK-2871] [PySpark] add countApproxDistinct() API

Repository: spark
Updated Branches:
  refs/heads/master 81b9d5b62 -> e2c901b4c


[SPARK-2871] [PySpark] add countApproxDistinct() API

RDD.countApproxDistinct(relativeSD=0.05):

        :: Experimental ::
        Return approximate number of distinct elements in the RDD.

        The algorithm used is based on streamlib's implementation of
        "HyperLogLog in Practice: Algorithmic Engineering of a State
        of The Art Cardinality Estimation Algorithm", available
        <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.

        This support all the types of objects, which is supported by
        Pyrolite, nearly all builtin types.

        param relativeSD Relative accuracy. Smaller values create
                           counters that require more space.
                           It must be greater than 0.000017.

        >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
        >>> 950 < n < 1050
        True
        >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
        >>> 18 < n < 22
        True

Author: Davies Liu <da...@gmail.com>

Closes #2142 from davies/countApproxDistinct and squashes the following commits:

e20da47 [Davies Liu] remove the correction in Python
c38c4e4 [Davies Liu] fix doc tests
2ab157c [Davies Liu] fix doc tests
9d2565f [Davies Liu] add commments and link for hash collision correction
d306492 [Davies Liu] change range of hash of tuple to [0, maxint]
ded624f [Davies Liu] calculate hash in Python
4cba98f [Davies Liu] add more tests
a85a8c6 [Davies Liu] Merge branch 'master' into countApproxDistinct
e97e342 [Davies Liu] add countApproxDistinct()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e2c901b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e2c901b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e2c901b4

Branch: refs/heads/master
Commit: e2c901b4c72b247bb422dd5acf057bc583e639ab
Parents: 81b9d5b
Author: Davies Liu <da...@gmail.com>
Authored: Tue Sep 2 15:47:47 2014 -0700
Committer: Josh Rosen <jo...@apache.org>
Committed: Tue Sep 2 15:47:47 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  2 +-
 python/pyspark/rdd.py                           | 39 +++++++++++++++++---
 python/pyspark/tests.py                         | 16 ++++++++
 3 files changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e2c901b4/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index daea261..af9e31b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -993,7 +993,7 @@ abstract class RDD[T: ClassTag](
    */
   @Experimental
   def countApproxDistinct(p: Int, sp: Int): Long = {
-    require(p >= 4, s"p ($p) must be greater than 0")
+    require(p >= 4, s"p ($p) must be at least 4")
     require(sp <= 32, s"sp ($sp) cannot be greater than 32")
     require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
     val zeroCounter = new HyperLogLogPlus(p, sp)

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c901b4/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 2d80fad..6fc9f66 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -62,7 +62,7 @@ def portable_hash(x):
 
     >>> portable_hash(None)
     0
-    >>> portable_hash((None, 1))
+    >>> portable_hash((None, 1)) & 0xffffffff
     219750521
     """
     if x is None:
@@ -72,7 +72,7 @@ def portable_hash(x):
         for i in x:
             h ^= portable_hash(i)
             h *= 1000003
-            h &= 0xffffffff
+            h &= sys.maxint
         h ^= len(x)
         if h == -1:
             h = -2
@@ -1942,7 +1942,7 @@ class RDD(object):
             return True
         return False
 
-    def _to_jrdd(self):
+    def _to_java_object_rdd(self):
         """ Return an JavaRDD of Object by unpickling
 
         It will convert each Python object into Java object by Pyrolite, whenever the
@@ -1977,7 +1977,7 @@ class RDD(object):
         >>> (rdd.sumApprox(1000) - r) / r < 0.05
         True
         """
-        jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_jrdd()
+        jrdd = self.mapPartitions(lambda it: [float(sum(it))])._to_java_object_rdd()
         jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
         r = jdrdd.sumApprox(timeout, confidence).getFinalValue()
         return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
@@ -1993,11 +1993,40 @@ class RDD(object):
         >>> (rdd.meanApprox(1000) - r) / r < 0.05
         True
         """
-        jrdd = self.map(float)._to_jrdd()
+        jrdd = self.map(float)._to_java_object_rdd()
         jdrdd = self.ctx._jvm.JavaDoubleRDD.fromRDD(jrdd.rdd())
         r = jdrdd.meanApprox(timeout, confidence).getFinalValue()
         return BoundedFloat(r.mean(), r.confidence(), r.low(), r.high())
 
+    def countApproxDistinct(self, relativeSD=0.05):
+        """
+        :: Experimental ::
+        Return approximate number of distinct elements in the RDD.
+
+        The algorithm used is based on streamlib's implementation of
+        "HyperLogLog in Practice: Algorithmic Engineering of a State
+        of The Art Cardinality Estimation Algorithm", available
+        <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+        @param relativeSD Relative accuracy. Smaller values create
+                           counters that require more space.
+                           It must be greater than 0.000017.
+
+        >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+        >>> 950 < n < 1050
+        True
+        >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
+        >>> 18 < n < 22
+        True
+        """
+        if relativeSD < 0.000017:
+            raise ValueError("relativeSD should be greater than 0.000017")
+        if relativeSD > 0.37:
+            raise ValueError("relativeSD should be smaller than 0.37")
+        # the hash space in Java is 2^32
+        hashRDD = self.map(lambda x: portable_hash(x) & 0xFFFFFFFF)
+        return hashRDD._to_java_object_rdd().countApproxDistinct(relativeSD)
+
 
 class PipelinedRDD(RDD):
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e2c901b4/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 3e7040e..f1a75cb 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -404,6 +404,22 @@ class TestRDDFunctions(PySparkTestCase):
         self.assertEquals(a.count(), b.count())
         self.assertRaises(Exception, lambda: a.zip(b).count())
 
+    def test_count_approx_distinct(self):
+        rdd = self.sc.parallelize(range(1000))
+        self.assertTrue(950 < rdd.countApproxDistinct(0.04) < 1050)
+        self.assertTrue(950 < rdd.map(float).countApproxDistinct(0.04) < 1050)
+        self.assertTrue(950 < rdd.map(str).countApproxDistinct(0.04) < 1050)
+        self.assertTrue(950 < rdd.map(lambda x: (x, -x)).countApproxDistinct(0.04) < 1050)
+
+        rdd = self.sc.parallelize([i % 20 for i in range(1000)], 7)
+        self.assertTrue(18 < rdd.countApproxDistinct() < 22)
+        self.assertTrue(18 < rdd.map(float).countApproxDistinct() < 22)
+        self.assertTrue(18 < rdd.map(str).countApproxDistinct() < 22)
+        self.assertTrue(18 < rdd.map(lambda x: (x, -x)).countApproxDistinct() < 22)
+
+        self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.00000001))
+        self.assertRaises(ValueError, lambda: rdd.countApproxDistinct(0.5))
+
     def test_histogram(self):
         # empty
         rdd = self.sc.parallelize([])


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org