You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/04/05 03:54:46 UTC

spark git commit: [SPARK-12425][STREAMING] DStream union optimisation

Repository: spark
Updated Branches:
  refs/heads/master a172e11cb -> 7201f033c


[SPARK-12425][STREAMING] DStream union optimisation

Use PartitionerAwareUnionRDD when possbile for optimizing shuffling and
preserving the partitioner.

Author: Guillaume Poulin <po...@gmail.com>

Closes #10382 from gpoulin/dstream_union_optimisation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7201f033
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7201f033
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7201f033

Branch: refs/heads/master
Commit: 7201f033ce520259b6d07ea5ead92272cac92363
Parents: a172e11
Author: Guillaume Poulin <po...@gmail.com>
Authored: Tue Apr 5 02:54:38 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Apr 5 02:54:38 2016 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala       |  6 +-----
 .../apache/spark/streaming/dstream/UnionDStream.scala    |  4 ++--
 .../apache/spark/streaming/dstream/WindowedDStream.scala | 11 ++---------
 3 files changed, 5 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7201f033/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 4a0a219..032939b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -568,11 +568,7 @@ abstract class RDD[T: ClassTag](
    * times (use `.distinct()` to eliminate them).
    */
   def union(other: RDD[T]): RDD[T] = withScope {
-    if (partitioner.isDefined && other.partitioner == partitioner) {
-      new PartitionerAwareUnionRDD(sc, Array(this, other))
-    } else {
-      new UnionRDD(sc, Array(this, other))
-    }
+    sc.union(this, other)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7201f033/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
index c1846a3..d46c0a0 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/UnionDStream.scala
@@ -21,7 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
 
 import org.apache.spark.SparkException
-import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.streaming.{Duration, Time}
 
 private[streaming]
@@ -45,7 +45,7 @@ class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
         s" time $validTime")
     }
     if (rdds.nonEmpty) {
-      Some(new UnionRDD(ssc.sc, rdds))
+      Some(ssc.sc.union(rdds))
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7201f033/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ee50a8d..fe0f875 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -19,7 +19,7 @@ package org.apache.spark.streaming.dstream
 
 import scala.reflect.ClassTag
 
-import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark.streaming.Duration
@@ -63,13 +63,6 @@ class WindowedDStream[T: ClassTag](
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)
-    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
-      logDebug("Using partition aware union for windowing at " + validTime)
-      new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
-    } else {
-      logDebug("Using normal union for windowing at " + validTime)
-      new UnionRDD(ssc.sc, rddsInWindow)
-    }
-    Some(windowRDD)
+    Some(ssc.sc.union(rddsInWindow))
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org