You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 22:21:15 UTC

[11/13] git commit: Removed unncessary options from WindowedDStream.

Removed unncessary options from WindowedDStream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/577c8cc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/577c8cc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/577c8cc8

Branch: refs/heads/master
Commit: 577c8cc8340abbdbbbd141597b1c7b8ff19b20be
Parents: 3579647
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 14:17:16 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 14:17:16 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/WindowedDStream.scala | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/577c8cc8/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ca4edae..89c43ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag](
     throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
     "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
-  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
-
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag](
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)
-    val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
-      logInfo("Using partition aware union")
+    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+      logDebug("Using partition aware union for windowing at " + validTime)
       new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
     } else {
-      logInfo("Using normal union")
+      logDebug("Using normal union for windowing at " + validTime)
       new UnionRDD(ssc.sc,rddsInWindow)
     }
     Some(windowRDD)