You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/01 02:48:32 UTC

[02/20] git commit: Added a countDistinct method to RDD that takes takes an accuracy parameter and returns the (approximate) number of distinct elements in the RDD.

Added a countDistinct method to RDD that takes takes an accuracy parameter and returns the (approximate) number of distinct elements in the RDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/1a701358
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/1a701358
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/1a701358

Branch: refs/heads/master
Commit: 1a701358c0811c7f270132291e0646fd806e4984
Parents: 843727a
Author: Hossein Falaki <fa...@gmail.com>
Authored: Thu Oct 17 22:24:48 2013 -0700
Committer: Hossein Falaki <fa...@gmail.com>
Committed: Thu Oct 17 22:24:48 2013 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 26 +++++++++++++++++++-
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 13 ++++++++++
 2 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1a701358/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0355618..09932db 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -30,6 +30,7 @@ import org.apache.hadoop.io.Text
 import org.apache.hadoop.mapred.TextOutputFormat
 
 import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
 
 import org.apache.spark.Partitioner._
 import org.apache.spark.api.java.JavaRDD
@@ -38,7 +39,7 @@ import org.apache.spark.partial.CountEvaluator
 import org.apache.spark.partial.GroupedCountEvaluator
 import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+import org.apache.spark.util.{Utils, BoundedPriorityQueue, SerializableHyperLogLog}
 
 import org.apache.spark.SparkContext._
 import org.apache.spark._
@@ -766,6 +767,29 @@ abstract class RDD[T: ClassManifest](
   }
 
   /**
+   * Return approximate number of distinct elements in the RDD.
+   *
+   * The accuracy of approximation can be controlled through the relative standard diviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05.
+   */
+  def countDistinct(relativeSD: Double = 0.05): Long = {
+
+    def hllCountPartition(iter: Iterator[T]): Iterator[SerializableHyperLogLog] = {
+      val hllCounter = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+      while (iter.hasNext) {
+        val v = iter.next()
+        hllCounter.value.offer(v)
+      }
+      Iterator(hllCounter)
+    }
+    def mergeCounters(c1: SerializableHyperLogLog, c2: SerializableHyperLogLog): SerializableHyperLogLog = c1.merge(c2)
+
+    mapPartitions(hllCountPartition).reduce(mergeCounters).value.cardinality()
+  }
+
+  /**
    * Take the first num elements of the RDD. It works by first scanning one partition, and use the
    * results from that partition to estimate the number of additional partitions needed to satisfy
    * the limit.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/1a701358/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 6d1bc5e..6baf9c7 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -63,6 +63,19 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     }
   }
 
+  test("Approximate distinct count") {
+
+    def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+
+    val size = 100
+    val uniformDistro = for (i <- 1 to 100000) yield i % size
+    val simpleRdd = sc.makeRDD(uniformDistro)
+    assert( error(simpleRdd.countDistinct(0.2), size) < 0.2)
+    assert( error(simpleRdd.countDistinct(0.05), size) < 0.05)
+    assert( error(simpleRdd.countDistinct(0.01), size) < 0.01)
+    assert( error(simpleRdd.countDistinct(0.001), size) < 0.001)
+  }
+
   test("SparkContext.union") {
     val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
     assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))