You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/01/01 02:48:33 UTC

[03/20] git commit: Added countDistinctByKey to PairRDDFunctions that counts the approximate number of unique values for each key in the RDD.

Added countDistinctByKey to PairRDDFunctions that counts the approximate number of unique values for each key in the RDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ec5df800
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ec5df800
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ec5df800

Branch: refs/heads/master
Commit: ec5df800fdb0109314c0d5cd6dcac2ecbb9433d6
Parents: 1a70135
Author: Hossein Falaki <fa...@gmail.com>
Authored: Thu Oct 17 22:26:00 2013 -0700
Committer: Hossein Falaki <fa...@gmail.com>
Committed: Thu Oct 17 22:26:00 2013 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 51 ++++++++++++++++++++
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 30 ++++++++++++
 2 files changed, 81 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec5df800/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 93b78e1..f34593f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -39,12 +39,15 @@ import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
 import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
 import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
 
+import com.clearspring.analytics.stream.cardinality.HyperLogLog
+
 import org.apache.spark._
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.Aggregator
 import org.apache.spark.Partitioner
 import org.apache.spark.Partitioner.defaultPartitioner
+import org.apache.spark.util.SerializableHyperLogLog
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -207,6 +210,54 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
   }
 
   /**
+   * Return approximate number of distinct values for each key in this RDD.
+   * The accuracy of approximation can be controlled through the relative standard diviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. Uses the provided
+   * Partitioner to partition the output RDD.
+   */
+  def countDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = {
+    val createHLL = (v: V) => {
+      val hll = new SerializableHyperLogLog(new HyperLogLog(relativeSD))
+      val bres = hll.value.offer(v)
+      hll
+    }
+    val mergeValueHLL = (hll: SerializableHyperLogLog, v: V) => {
+      hll.value.offer(v)
+      hll
+    }
+    val mergeHLL = (h1: SerializableHyperLogLog, h2: SerializableHyperLogLog) => h1.merge(h2)
+
+    combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).map {
+      case (k, v) => (k, v.value.cardinality())
+    }
+  }
+
+  /**
+   * Return approximate number of distinct values for each key in this RDD. 
+   * The accuracy of approximation can be controlled through the relative standard diviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. HashPartitions the
+   * output RDD into numPartitions.
+   *
+   */
+  def countDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = {
+    countDistinctByKey(relativeSD, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return approximate number of distinct values for each key this RDD.
+   * The accuracy of approximation can be controlled through the relative standard diviation
+   * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
+   * more accurate counts but increase the memory footprint and vise versa. The default value of
+   * relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
+   * level.
+   */
+  def countDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = {
+    countDistinctByKey(relativeSD, defaultPartitioner(self))
+  }
+
+  /**
    * Merge the values for each key using an associative reduce function. This will also perform
    * the merging locally on each mapper before sending results to a reducer, similarly to a
    * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ec5df800/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 57d3382..d81bc8c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -109,6 +109,36 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
     assert(deps.size === 2) // ShuffledRDD, ParallelCollection.
   }
 
+  test("countDistinctByKey") {
+    def error(est: Long, size: Long) = math.abs(est - size)/size.toDouble
+
+    /* Since HyperLogLog unique counting is approximate, and the relative standard deviation is
+    only a statistical bound, the tests can fail for large values of relativeSD. We will be using
+     relatively tight error bounds to check correctness of functionality rather than checking
+     whether the approximation conforms with the requested bound.
+     */
+    val relativeSD = 0.001
+
+    val stacked = (1 to 100).flatMap(i => (1 to i).map(j => (i, j)))
+    val rdd1 = sc.parallelize(stacked)
+    val counted1 = rdd1.countDistinctByKey(relativeSD).collect()
+    counted1.foreach{
+      case(k, count) => assert(math.abs(error(count, k)) < relativeSD)
+    }
+
+    import scala.util.Random
+    val rnd = new Random()
+    val randStacked = (1 to 100).flatMap{i =>
+      val num = rnd.nextInt%500
+      (1 to num).map(j => (num, j))
+    }
+    val rdd2 = sc.parallelize(randStacked)
+    val counted2 = rdd2.countDistinctByKey(relativeSD, 4).collect()
+    counted2.foreach{
+      case(k, count) => assert(math.abs(error(count, k)) < relativeSD)
+    }
+  }
+
   test("join") {
     val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
     val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))