You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 22:21:12 UTC

[08/13] git commit: Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.

Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/069cb14b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/069cb14b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/069cb14b

Branch: refs/heads/master
Commit: 069cb14bdcbd366453db66932b8c83431c35f5f5
Parents: 9447967
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 02:58:29 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 02:58:29 2013 -0800

----------------------------------------------------------------------
 .../spark/streaming/PairDStreamFunctions.scala    | 13 ++++++++++---
 .../streaming/api/java/JavaPairDStream.scala      | 18 +++++++++++++++++-
 .../spark/streaming/dstream/ShuffledDStream.scala |  9 +++++----
 .../spark/streaming/dstream/WindowedDStream.scala |  2 +-
 .../spark/streaming/WindowOperationsSuite.scala   |  4 +---
 5 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 80af96c..56dbcbd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -108,8 +108,9 @@ extends Serializable {
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiner: (C, C) => C,
-    partitioner: Partitioner) : DStream[(K, C)] = {
-    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
+    partitioner: Partitioner,
+    mapSideCombine: Boolean = true): DStream[(K, C)] = {
+    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
   }
 
   /**
@@ -173,7 +174,13 @@ extends Serializable {
       slideDuration: Duration,
       partitioner: Partitioner
     ): DStream[(K, Seq[V])] = {
-    self.window(windowDuration, slideDuration).groupByKey(partitioner)
+    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+    self.groupByKey(partitioner)
+        .window(windowDuration, slideDuration)
+        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+        .asInstanceOf[DStream[(K, Seq[V])]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index dfd6e27..6c3467d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
    * information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
@@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
+   * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
+   * information.
+   */
+  def combineByKey[C](createCombiner: JFunction[V, C],
+      mergeValue: JFunction2[C, V, C],
+      mergeCombiners: JFunction2[C, C, C],
+      partitioner: Partitioner,
+      mapSideCombine: Boolean
+    ): JavaPairDStream[K, C] = {
+    implicit val cm: ClassTag[C] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
+  }
+
+  /**
    * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
    * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
    * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index e6e0022..84e69f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiner: (C, C) => C,
-    partitioner: Partitioner
-  ) extends DStream [(K,C)] (parent.ssc) {
+    partitioner: Partitioner,
+    mapSideCombine: Boolean = true
+  ) extends DStream[(K,C)] (parent.ssc) {
 
   override def dependencies = List(parent)
 
@@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
 
   override def compute(validTime: Time): Option[RDD[(K,C)]] = {
     parent.getOrCompute(validTime) match {
-      case Some(rdd) =>
-        Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+      case Some(rdd) => Some(rdd.combineByKey[C](
+          createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
       case None => None
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ea21ab5..ca4edae 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,7 +39,7 @@ class WindowedDStream[T: ClassTag](
     throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
     "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
-  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean
+  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
 
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 6b4aaef..b29d790 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -225,9 +225,7 @@ class WindowOperationsSuite extends TestSuiteBase {
     val slideDuration = Seconds(1)
     val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
     val operation = (s: DStream[(String, Int)]) => {
-      s.groupByKeyAndWindow(windowDuration, slideDuration)
-       .map(x => (x._1, x._2.toSet))
-       .persist()
+      s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet))
     }
     testOperation(input, operation, expectedOutput, numBatches, true)
   }