You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 22:21:05 UTC

[01/13] git commit: Added partition aware union to improve reduceByKeyAndWindow

Updated Branches:
  refs/heads/master 7bafb68d7 -> 588a1695f


Added partition aware union to improve reduceByKeyAndWindow


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2ec4b2e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2ec4b2e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2ec4b2e3

Branch: refs/heads/master
Commit: 2ec4b2e38d432ef4f21b725c2fceac863d5f9ea1
Parents: e2ebc3a
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed Nov 20 23:49:30 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed Nov 20 23:49:30 2013 -0800

----------------------------------------------------------------------
 .../streaming/dstream/WindowedDStream.scala     | 51 +++++++++++++++++++-
 1 file changed, 49 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2ec4b2e3/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 3c57294..03f522e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -20,7 +20,12 @@ package org.apache.spark.streaming.dstream
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.UnionRDD
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
+import org.apache.spark.streaming._
+import org.apache.spark._
+import scala.Some
+import scala.Some
+import scala.Some
+import org.apache.spark.streaming.Duration
 
 private[streaming]
 class WindowedDStream[T: ClassManifest](
@@ -49,9 +54,51 @@ class WindowedDStream[T: ClassManifest](
 
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
-    Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
+    val rddsInWindow = parent.slice(currentWindow)
+    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+      logInfo("Using partition aware union")
+      new PartitionAwareUnionRDD(ssc.sc, rddsInWindow)
+    } else {
+      logInfo("Using normal union")
+      new UnionRDD(ssc.sc,rddsInWindow)
+    }
+    Some(windowRDD)
   }
 }
 
+private[streaming]
+class PartitionAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
+  extends Partition {
+  override val index = idx
+  override def hashCode(): Int = idx
+}
+
+private[streaming]
+class PartitionAwareUnionRDD[T: ClassManifest](
+    sc: SparkContext,
+    var rdds: Seq[RDD[T]])
+  extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
+  require(rdds.length > 0)
+  require(rdds.flatMap(_.partitioner).distinct.length == 1, "Parent RDDs have different partitioners")
+
+  override val partitioner = rdds.head.partitioner
+
+  override def getPartitions: Array[Partition] = {
+    val numPartitions = rdds.head.partitions.length
+    (0 until numPartitions).map(index => {
+      val parentPartitions = rdds.map(_.partitions(index)).toArray
+      new PartitionAwareUnionRDDPartition(index, parentPartitions)
+    }).toArray
+  }
+
+  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
+    val parentPartitions = s.asInstanceOf[PartitionAwareUnionRDDPartition].partitions
+    rdds.zip(parentPartitions).iterator.flatMap {
+      case (rdd, p) => rdd.iterator(p, context)
+    }
+  }
+}
+
+
 
 


[09/13] git commit: Merge branch 'master' into window-improvement

Posted by pw...@apache.org.
Merge branch 'master' into window-improvement


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c4a54f51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c4a54f51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c4a54f51

Branch: refs/heads/master
Commit: c4a54f51b51151dd0b6c263376024a641f4059c1
Parents: 069cb14 d63856c
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 12:03:11 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 12:03:11 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/MapOutputTracker.scala     |  5 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |  2 +
 .../org/apache/spark/api/java/JavaRDDLike.scala | 11 +++
 .../org/apache/spark/api/python/PythonRDD.scala |  4 -
 .../org/apache/spark/deploy/client/Client.scala |  6 +-
 .../org/apache/spark/deploy/master/Master.scala | 27 ++----
 .../deploy/master/SparkZooKeeperSession.scala   | 11 +--
 .../master/ZooKeeperLeaderElectionAgent.scala   |  4 +-
 .../deploy/master/ui/ApplicationPage.scala      | 14 ++-
 .../spark/deploy/master/ui/IndexPage.scala      | 16 ++--
 .../spark/deploy/master/ui/MasterWebUI.scala    | 10 +--
 .../spark/deploy/worker/ui/IndexPage.scala      |  4 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    | 10 +--
 .../org/apache/spark/executor/Executor.scala    | 12 ++-
 .../org/apache/spark/executor/TaskMetrics.scala |  5 ++
 .../org/apache/spark/scheduler/TaskResult.scala | 20 ++---
 .../cluster/CoarseGrainedSchedulerBackend.scala | 10 +--
 .../spark/storage/BlockManagerMaster.scala      |  7 +-
 .../spark/storage/BlockManagerMasterActor.scala | 11 +--
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  | 31 ++++++-
 .../apache/spark/ui/jobs/ExecutorSummary.scala  | 27 ++++++
 .../apache/spark/ui/jobs/ExecutorTable.scala    | 90 ++++++++++++++++++++
 .../spark/ui/jobs/JobProgressListener.scala     | 34 ++++++++
 .../org/apache/spark/ui/jobs/StagePage.scala    | 17 +++-
 .../org/apache/spark/ui/jobs/StageTable.scala   |  2 +-
 .../spark/ui/storage/BlockManagerUI.scala       |  3 -
 .../scala/org/apache/spark/util/AkkaUtils.scala |  6 ++
 .../scala/org/apache/spark/JavaAPISuite.java    | 33 +++++++
 .../cluster/ClusterTaskSetManagerSuite.scala    |  3 +-
 .../ui/jobs/JobProgressListenerSuite.scala      | 73 ++++++++++++++++
 project/SparkBuild.scala                        |  7 +-
 python/pyspark/context.py                       |  3 -
 python/pyspark/rdd.py                           | 10 ++-
 run-example                                     | 10 +++
 sbt/sbt                                         | 21 ++++-
 spark-class                                     | 10 +++
 spark-shell                                     | 19 ++++-
 37 files changed, 465 insertions(+), 123 deletions(-)
----------------------------------------------------------------------



[11/13] git commit: Removed unncessary options from WindowedDStream.

Posted by pw...@apache.org.
Removed unncessary options from WindowedDStream.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/577c8cc8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/577c8cc8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/577c8cc8

Branch: refs/heads/master
Commit: 577c8cc8340abbdbbbd141597b1c7b8ff19b20be
Parents: 3579647
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 14:17:16 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 14:17:16 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/WindowedDStream.scala | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/577c8cc8/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ca4edae..89c43ff 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,8 +39,6 @@ class WindowedDStream[T: ClassTag](
     throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
     "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
-  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
-
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -54,11 +52,11 @@ class WindowedDStream[T: ClassTag](
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)
-    val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
-      logInfo("Using partition aware union")
+    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+      logDebug("Using partition aware union for windowing at " + validTime)
       new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
     } else {
-      logInfo("Using normal union")
+      logDebug("Using normal union for windowing at " + validTime)
       new UnionRDD(ssc.sc,rddsInWindow)
     }
     Some(windowRDD)


[12/13] git commit: Added Apache boilerplate and class docs to PartitionerAwareUnionRDD.

Posted by pw...@apache.org.
Added Apache boilerplate and class docs to PartitionerAwareUnionRDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5fde4566
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5fde4566
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5fde4566

Branch: refs/heads/master
Commit: 5fde4566ea48e5c6d6c50af032a29eaded2d7c43
Parents: 577c8cc
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 14:33:37 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 14:33:37 2013 -0800

----------------------------------------------------------------------
 .../spark/rdd/PartitionerAwareUnionRDD.scala    | 36 ++++++++++++++++++--
 1 file changed, 33 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5fde4566/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 3cbf3b4..4c625d0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -1,9 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.spark.rdd
 
-import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
 import scala.reflect.ClassTag
 import java.io.{ObjectOutputStream, IOException}
+import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
+
 
+/**
+ * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions
+ * of parent RDDs.
+ */
 private[spark]
 class PartitionerAwareUnionRDDPartition(
     @transient val rdds: Seq[RDD[_]],
@@ -22,6 +44,14 @@ class PartitionerAwareUnionRDDPartition(
   }
 }
 
+/**
+ * Class representing an RDD that can take multiple RDDs partitioned by the same partitioner and
+ * unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each
+ * will be unified to a single RDD with p partitions and the same partitioner. The preferred
+ * location for each partition of the unified RDD will be the most common preferred location
+ * of the corresponding partitions of the parent RDDs. For example, location of partition 0
+ * of the unified RDD will be where most of partition 0 of the parent RDDs are located.
+ */
 private[spark]
 class PartitionerAwareUnionRDD[T: ClassTag](
     sc: SparkContext,
@@ -54,7 +84,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
     val location = if (locations.isEmpty) {
       None
     } else  {
-      // Find the location where maximum number of parent partitions prefer 
+      // Find the location that maximum number of parent partitions prefer
       Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
     }
     logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
@@ -73,7 +103,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
     rdds = null
   }
 
-  // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+  // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
   private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
     rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
   }


[10/13] git commit: Merge branch 'apache-master' into window-improvement

Posted by pw...@apache.org.
Merge branch 'apache-master' into window-improvement


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/3579647c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/3579647c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/3579647c

Branch: refs/heads/master
Commit: 3579647cdc8ace0170566783faaf7102ef1f2052
Parents: c4a54f5 e240bad
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 12:12:10 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 12:12:10 2013 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  41 +-
 .../spark/scheduler/ExecutorLossReason.scala    |  38 +
 .../spark/scheduler/SchedulerBackend.scala      |  37 +
 .../spark/scheduler/TaskResultGetter.scala      | 107 +++
 .../apache/spark/scheduler/TaskScheduler.scala  |  11 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     | 474 ++++++++++++
 .../apache/spark/scheduler/TaskSetManager.scala | 704 +++++++++++++++++-
 .../apache/spark/scheduler/WorkerOffer.scala    |  24 +
 .../scheduler/cluster/ClusterScheduler.scala    | 473 ------------
 .../cluster/ClusterTaskSetManager.scala         | 713 -------------------
 .../cluster/CoarseGrainedSchedulerBackend.scala |   6 +-
 .../scheduler/cluster/ExecutorLossReason.scala  |  38 -
 .../scheduler/cluster/SchedulerBackend.scala    |  37 -
 .../cluster/SimrSchedulerBackend.scala          |   4 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   6 +-
 .../scheduler/cluster/TaskResultGetter.scala    | 108 ---
 .../spark/scheduler/cluster/WorkerOffer.scala   |  24 -
 .../mesos/CoarseMesosSchedulerBackend.scala     |   5 +-
 .../cluster/mesos/MesosSchedulerBackend.scala   |   7 +-
 .../spark/scheduler/local/LocalBackend.scala    | 108 +++
 .../spark/scheduler/local/LocalScheduler.scala  | 222 ------
 .../scheduler/local/LocalTaskSetManager.scala   | 191 -----
 .../spark/storage/ShuffleBlockManager.scala     |  25 +-
 .../org/apache/spark/util/MetadataCleaner.scala |   2 +-
 .../apache/spark/util/TimeStampedHashMap.scala  |  15 +-
 .../scala/org/apache/spark/FailureSuite.scala   |   4 +-
 .../SparkContextSchedulerCreationSuite.scala    |  58 +-
 .../spark/scheduler/ClusterSchedulerSuite.scala | 265 +++++++
 .../org/apache/spark/scheduler/FakeTask.scala   |  26 +
 .../spark/scheduler/SparkListenerSuite.scala    |  19 +-
 .../spark/scheduler/TaskResultGetterSuite.scala | 112 +++
 .../spark/scheduler/TaskSetManagerSuite.scala   | 319 +++++++++
 .../cluster/ClusterSchedulerSuite.scala         | 267 -------
 .../cluster/ClusterTaskSetManagerSuite.scala    | 319 ---------
 .../spark/scheduler/cluster/FakeTask.scala      |  27 -
 .../cluster/TaskResultGetterSuite.scala         | 114 ---
 .../scheduler/local/LocalSchedulerSuite.scala   | 227 ------
 .../streaming/examples/JavaKafkaWordCount.java  |   2 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 232 ++++++
 .../org/apache/spark/deploy/yarn/Client.scala   |   4 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |  10 +-
 .../cluster/YarnClientClusterScheduler.scala    |   3 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   3 +-
 .../cluster/YarnClusterScheduler.scala          |   3 +-
 python/pyspark/java_gateway.py                  |   1 +
 python/pyspark/mllib/__init__.py                |  20 +
 python/pyspark/mllib/_common.py                 | 227 ++++++
 python/pyspark/mllib/classification.py          |  86 +++
 python/pyspark/mllib/clustering.py              |  79 ++
 python/pyspark/mllib/recommendation.py          |  74 ++
 python/pyspark/mllib/regression.py              | 110 +++
 python/pyspark/serializers.py                   |   2 +-
 python/pyspark/shell.py                         |   2 +-
 spark-class                                     |   6 +-
 spark-class2.cmd                                |   2 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |  10 +-
 .../cluster/YarnClientClusterScheduler.scala    |   3 +-
 .../cluster/YarnClientSchedulerBackend.scala    |   3 +-
 .../cluster/YarnClusterScheduler.scala          |  10 +-
 59 files changed, 3181 insertions(+), 2888 deletions(-)
----------------------------------------------------------------------



[08/13] git commit: Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.

Posted by pw...@apache.org.
Updated groupByKeyAndWindow to be computed incrementally, and added mapSideCombine to combineByKeyAndWindow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/069cb14b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/069cb14b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/069cb14b

Branch: refs/heads/master
Commit: 069cb14bdcbd366453db66932b8c83431c35f5f5
Parents: 9447967
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 02:58:29 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 02:58:29 2013 -0800

----------------------------------------------------------------------
 .../spark/streaming/PairDStreamFunctions.scala    | 13 ++++++++++---
 .../streaming/api/java/JavaPairDStream.scala      | 18 +++++++++++++++++-
 .../spark/streaming/dstream/ShuffledDStream.scala |  9 +++++----
 .../spark/streaming/dstream/WindowedDStream.scala |  2 +-
 .../spark/streaming/WindowOperationsSuite.scala   |  4 +---
 5 files changed, 34 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
index 80af96c..56dbcbd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/PairDStreamFunctions.scala
@@ -108,8 +108,9 @@ extends Serializable {
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiner: (C, C) => C,
-    partitioner: Partitioner) : DStream[(K, C)] = {
-    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
+    partitioner: Partitioner,
+    mapSideCombine: Boolean = true): DStream[(K, C)] = {
+    new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)
   }
 
   /**
@@ -173,7 +174,13 @@ extends Serializable {
       slideDuration: Duration,
       partitioner: Partitioner
     ): DStream[(K, Seq[V])] = {
-    self.window(windowDuration, slideDuration).groupByKey(partitioner)
+    val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
+    val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+    val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
+    self.groupByKey(partitioner)
+        .window(windowDuration, slideDuration)
+        .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
+        .asInstanceOf[DStream[(K, Seq[V])]]
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index dfd6e27..6c3467d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -155,7 +155,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
 
   /**
    * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
-   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.PairRDDFunctions]] for more
+   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
    * information.
    */
   def combineByKey[C](createCombiner: JFunction[V, C],
@@ -169,6 +169,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
   }
 
   /**
+   * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+   * combineByKey for RDDs. Please refer to combineByKey in [[org.apache.spark.rdd.PairRDDFunctions]] for more
+   * information.
+   */
+  def combineByKey[C](createCombiner: JFunction[V, C],
+      mergeValue: JFunction2[C, V, C],
+      mergeCombiners: JFunction2[C, C, C],
+      partitioner: Partitioner,
+      mapSideCombine: Boolean
+    ): JavaPairDStream[K, C] = {
+    implicit val cm: ClassTag[C] =
+      implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
+    dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine)
+  }
+
+  /**
    * Return a new DStream by applying `groupByKey` over a sliding window. This is similar to
    * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
    * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
index e6e0022..84e69f2 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ShuffledDStream.scala
@@ -29,8 +29,9 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
     createCombiner: V => C,
     mergeValue: (C, V) => C,
     mergeCombiner: (C, C) => C,
-    partitioner: Partitioner
-  ) extends DStream [(K,C)] (parent.ssc) {
+    partitioner: Partitioner,
+    mapSideCombine: Boolean = true
+  ) extends DStream[(K,C)] (parent.ssc) {
 
   override def dependencies = List(parent)
 
@@ -38,8 +39,8 @@ class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
 
   override def compute(validTime: Time): Option[RDD[(K,C)]] = {
     parent.getOrCompute(validTime) match {
-      case Some(rdd) =>
-        Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner))
+      case Some(rdd) => Some(rdd.combineByKey[C](
+          createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
       case None => None
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index ea21ab5..ca4edae 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -39,7 +39,7 @@ class WindowedDStream[T: ClassTag](
     throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
     "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
-  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean
+  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "true").toBoolean
 
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/069cb14b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
index 6b4aaef..b29d790 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
@@ -225,9 +225,7 @@ class WindowOperationsSuite extends TestSuiteBase {
     val slideDuration = Seconds(1)
     val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
     val operation = (s: DStream[(String, Int)]) => {
-      s.groupByKeyAndWindow(windowDuration, slideDuration)
-       .map(x => (x._1, x._2.toSet))
-       .persist()
+      s.groupByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toSet))
     }
     testOperation(input, operation, expectedOutput, numBatches, true)
   }


[07/13] git commit: Fixed bug in PartitionAwareUnionRDD

Posted by pw...@apache.org.
Fixed bug in PartitionAwareUnionRDD


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/94479673
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/94479673
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/94479673

Branch: refs/heads/master
Commit: 94479673eb0ea839d5f6b6bd43c5abf75af7b9eb
Parents: e9165d2
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 26 00:07:45 2013 +0000
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 26 00:07:45 2013 +0000

----------------------------------------------------------------------
 .../apache/spark/rdd/PartitionerAwareUnionRDD.scala  | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/94479673/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 995042e..3cbf3b4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -9,8 +9,8 @@ class PartitionerAwareUnionRDDPartition(
     @transient val rdds: Seq[RDD[_]],
     val idx: Int
   ) extends Partition {
-  var parents = rdds.map(_.partitions(index)).toArray
-
+  var parents = rdds.map(_.partitions(idx)).toArray
+  
   override val index = idx
   override def hashCode(): Int = idx
 
@@ -42,7 +42,7 @@ class PartitionerAwareUnionRDD[T: ClassTag](
 
   // Get the location where most of the partitions of parent RDDs are located
   override def getPreferredLocations(s: Partition): Seq[String] = {
-    logDebug("Getting preferred locations for " + this)
+    logDebug("Finding preferred location for " + this + ", partition " + s.index)
     val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
     val locations = rdds.zip(parentPartitions).flatMap {
       case (rdd, part) => {
@@ -51,11 +51,14 @@ class PartitionerAwareUnionRDD[T: ClassTag](
         parentLocations
       }
     }
-    if (locations.isEmpty) {
-      Seq.empty
+    val location = if (locations.isEmpty) {
+      None
     } else  {
-      Seq(locations.groupBy(x => x).map(x => (x._1, x._2.length)).maxBy(_._2)._1)
+      // Find the location where maximum number of parent partitions prefer 
+      Some(locations.groupBy(x => x).maxBy(_._2.length)._1)
     }
+    logDebug("Selected location for " + this + ", partition " + s.index + " = " + location)
+    location.toSeq
   }
 
   override def compute(s: Partition, context: TaskContext): Iterator[T] = {


[06/13] git commit: Merge branch 'scheduler-update' into window-improvement

Posted by pw...@apache.org.
Merge branch 'scheduler-update' into window-improvement


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/e9165d2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/e9165d2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/e9165d2a

Branch: refs/heads/master
Commit: e9165d2a391c73d7e06436426047759aa62807c2
Parents: 61f4bbd 6eaa050
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Dec 23 17:49:41 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Dec 23 17:49:41 2013 -0800

----------------------------------------------------------------------
 .../spark/streaming/scheduler/BatchInfo.scala     | 18 ++++++++++++++++++
 .../spark/streaming/scheduler/JobScheduler.scala  |  4 +++-
 .../apache/spark/streaming/scheduler/JobSet.scala | 13 ++++++++++---
 .../spark/streaming/StreamingListenerSuite.scala  |  2 +-
 4 files changed, 32 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[02/13] git commit: Added partitioner aware union, modified DStream.window.

Posted by pw...@apache.org.
Added partitioner aware union, modified DStream.window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/fd031679
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/fd031679
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/fd031679

Branch: refs/heads/master
Commit: fd031679df59b83ae0a735ea77c49623f6e257c4
Parents: 2ec4b2e
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Nov 21 11:28:37 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Nov 21 11:28:37 2013 -0800

----------------------------------------------------------------------
 .../spark/rdd/PartitionerAwareUnionRDD.scala    | 65 ++++++++++++++++++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 27 ++++++++
 .../streaming/dstream/WindowedDStream.scala     | 41 +-----------
 3 files changed, 94 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fd031679/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
new file mode 100644
index 0000000..96cf93f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -0,0 +1,65 @@
+package org.apache.spark.rdd
+
+import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
+
+private[spark]
+class PartitionerAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
+  extends Partition {
+  override val index = idx
+  override def hashCode(): Int = idx
+}
+
+private[spark]
+class PartitionerAwareUnionRDD[T: ClassManifest](
+    sc: SparkContext,
+    var rdds: Seq[RDD[T]]
+  ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
+  require(rdds.length > 0)
+  require(rdds.flatMap(_.partitioner).toSet.size == 1,
+    "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner))
+
+  override val partitioner = rdds.head.partitioner
+
+  override def getPartitions: Array[Partition] = {
+    val numPartitions = rdds.head.partitions.length
+    (0 until numPartitions).map(index => {
+      val parentPartitions = rdds.map(_.partitions(index)).toArray
+      new PartitionerAwareUnionRDDPartition(index, parentPartitions)
+    }).toArray
+  }
+
+  // Get the location where most of the partitions of parent RDDs are located
+  override def getPreferredLocations(s: Partition): Seq[String] = {
+    logDebug("Getting preferred locations for " + this)
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    val locations = rdds.zip(parentPartitions).flatMap {
+      case (rdd, part) => {
+        val parentLocations = currPrefLocs(rdd, part)
+        logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations)
+        parentLocations
+      }
+    }
+
+    if (locations.isEmpty) {
+      Seq.empty
+    } else  {
+      Seq(locations.groupBy(x => x).map(x => (x._1, x._2.length)).maxBy(_._2)._1)
+    }
+  }
+
+  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    rdds.zip(parentPartitions).iterator.flatMap {
+      case (rdd, p) => rdd.iterator(p, context)
+    }
+  }
+
+  // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+  private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
+    rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
+  }
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fd031679/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 354ab8a..88b36a6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -71,6 +71,33 @@ class RDDSuite extends FunSuite with SharedSparkContext {
     assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
   }
 
+  test("partitioner aware union") {
+    import SparkContext._
+    def makeRDDWithPartitioner(seq: Seq[Int]) = {
+      sc.makeRDD(seq, 1)
+        .map(x => (x, null))
+        .partitionBy(new HashPartitioner(2))
+        .mapPartitions(_.map(_._1), true)
+    }
+
+    val nums1 = makeRDDWithPartitioner(1 to 4)
+    val nums2 = makeRDDWithPartitioner(5 to 8)
+    assert(nums1.partitioner == nums2.partitioner)
+    assert(new PartitionerAwareUnionRDD(sc, Seq(nums1)).collect().toSet === Set(1, 2, 3, 4))
+
+    val union = new PartitionerAwareUnionRDD(sc, Seq(nums1, nums2))
+    assert(union.collect().toSet === Set(1, 2, 3, 4, 5, 6, 7, 8))
+    val nums1Parts = nums1.collectPartitions()
+    val nums2Parts = nums2.collectPartitions()
+    val unionParts = union.collectPartitions()
+    assert(nums1Parts.length === 2)
+    assert(nums2Parts.length === 2)
+    assert(unionParts.length === 2)
+    assert((nums1Parts(0) ++ nums2Parts(0)).toList === unionParts(0).toList)
+    assert((nums1Parts(1) ++ nums2Parts(1)).toList === unionParts(1).toList)
+    assert(union.partitioner === nums1.partitioner)
+  }
+
   test("aggregate") {
     val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
     type StringMap = HashMap[String, Int]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/fd031679/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 03f522e..49f8431 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.streaming.dstream
 
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
+import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.streaming._
 import org.apache.spark._
@@ -57,7 +56,7 @@ class WindowedDStream[T: ClassManifest](
     val rddsInWindow = parent.slice(currentWindow)
     val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
       logInfo("Using partition aware union")
-      new PartitionAwareUnionRDD(ssc.sc, rddsInWindow)
+      new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
     } else {
       logInfo("Using normal union")
       new UnionRDD(ssc.sc,rddsInWindow)
@@ -66,39 +65,3 @@ class WindowedDStream[T: ClassManifest](
   }
 }
 
-private[streaming]
-class PartitionAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
-  extends Partition {
-  override val index = idx
-  override def hashCode(): Int = idx
-}
-
-private[streaming]
-class PartitionAwareUnionRDD[T: ClassManifest](
-    sc: SparkContext,
-    var rdds: Seq[RDD[T]])
-  extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
-  require(rdds.length > 0)
-  require(rdds.flatMap(_.partitioner).distinct.length == 1, "Parent RDDs have different partitioners")
-
-  override val partitioner = rdds.head.partitioner
-
-  override def getPartitions: Array[Partition] = {
-    val numPartitions = rdds.head.partitions.length
-    (0 until numPartitions).map(index => {
-      val parentPartitions = rdds.map(_.partitions(index)).toArray
-      new PartitionAwareUnionRDDPartition(index, parentPartitions)
-    }).toArray
-  }
-
-  override def compute(s: Partition, context: TaskContext): Iterator[T] = {
-    val parentPartitions = s.asInstanceOf[PartitionAwareUnionRDDPartition].partitions
-    rdds.zip(parentPartitions).iterator.flatMap {
-      case (rdd, p) => rdd.iterator(p, context)
-    }
-  }
-}
-
-
-
-


[04/13] git commit: Merge branch 'scheduler-update' into window-improvement

Posted by pw...@apache.org.
Merge branch 'scheduler-update' into window-improvement

Conflicts:
	streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/de41c436
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/de41c436
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/de41c436

Branch: refs/heads/master
Commit: de41c436a0088efc83bc4705dcd279e61b085759
Parents: 03ef6e8 ec71b44
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 19 12:05:08 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 19 12:05:08 2013 -0800

----------------------------------------------------------------------
 .gitignore                                      |    1 +
 README.md                                       |   11 +-
 assembly/pom.xml                                |   16 +-
 bagel/pom.xml                                   |   12 +-
 bin/compute-classpath.cmd                       |    2 +-
 bin/compute-classpath.sh                        |    2 +-
 conf/metrics.properties.template                |    8 +
 core/pom.xml                                    |   47 +-
 .../apache/spark/network/netty/FileClient.java  |    2 -
 .../apache/spark/network/netty/FileServer.java  |    1 -
 .../scala/org/apache/spark/FutureAction.scala   |    2 +-
 .../org/apache/spark/MapOutputTracker.scala     |   37 +-
 .../scala/org/apache/spark/Partitioner.scala    |    8 +-
 .../scala/org/apache/spark/SparkContext.scala   |  288 +--
 .../main/scala/org/apache/spark/SparkEnv.scala  |   13 +-
 .../main/scala/org/apache/spark/TaskState.scala |    3 +-
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   49 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |   77 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |    7 +-
 .../org/apache/spark/api/java/JavaRDDLike.scala |   32 +-
 .../spark/api/java/JavaSparkContext.scala       |   60 +-
 .../java/JavaSparkContextVarargsWorkaround.java |    1 -
 .../api/java/function/FlatMapFunction.scala     |    4 +-
 .../api/java/function/FlatMapFunction2.scala    |    4 +-
 .../spark/api/java/function/Function.java       |    8 +-
 .../spark/api/java/function/Function2.java      |    8 +-
 .../spark/api/java/function/Function3.java      |    8 +-
 .../api/java/function/PairFlatMapFunction.java  |   12 +-
 .../spark/api/java/function/PairFunction.java   |   12 +-
 .../org/apache/spark/api/python/PythonRDD.scala |  159 +-
 .../spark/api/python/PythonWorkerFactory.scala  |    4 +-
 .../apache/spark/broadcast/HttpBroadcast.scala  |   10 +-
 .../org/apache/spark/deploy/ExecutorState.scala |    3 +-
 .../spark/deploy/FaultToleranceTest.scala       |   28 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |   13 +-
 .../org/apache/spark/deploy/client/Client.scala |   48 +-
 .../spark/deploy/master/ApplicationState.scala  |    3 +-
 .../master/FileSystemPersistenceEngine.scala    |    6 +-
 .../org/apache/spark/deploy/master/Master.scala |   69 +-
 .../spark/deploy/master/RecoveryState.scala     |    4 +-
 .../spark/deploy/master/WorkerState.scala       |    4 +-
 .../master/ZooKeeperPersistenceEngine.scala     |    6 +-
 .../deploy/master/ui/ApplicationPage.scala      |    5 +-
 .../spark/deploy/master/ui/IndexPage.scala      |    4 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |    2 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   48 +-
 .../spark/deploy/worker/ui/IndexPage.scala      |    5 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   13 +-
 .../executor/CoarseGrainedExecutorBackend.scala |   23 +-
 .../org/apache/spark/executor/Executor.scala    |    2 +-
 .../org/apache/spark/executor/TaskMetrics.scala |   23 +-
 .../spark/metrics/sink/GraphiteSink.scala       |   82 +
 .../spark/network/ConnectionManager.scala       |    8 +-
 .../spark/network/ConnectionManagerTest.scala   |    4 +-
 .../spark/network/netty/ShuffleCopier.scala     |    2 +-
 .../org/apache/spark/rdd/AsyncRDDActions.scala  |    3 +-
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |    4 +-
 .../org/apache/spark/rdd/CartesianRDD.scala     |    5 +-
 .../org/apache/spark/rdd/CheckpointRDD.scala    |   14 +-
 .../org/apache/spark/rdd/CoalescedRDD.scala     |    3 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |  127 ++
 .../scala/org/apache/spark/rdd/EmptyRDD.scala   |    5 +-
 .../org/apache/spark/rdd/FilteredRDD.scala      |    3 +-
 .../org/apache/spark/rdd/FlatMappedRDD.scala    |    3 +-
 .../scala/org/apache/spark/rdd/GlommedRDD.scala |    3 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |    4 +-
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |   12 +-
 .../spark/rdd/MapPartitionsWithContextRDD.scala |   41 -
 .../scala/org/apache/spark/rdd/MappedRDD.scala  |    4 +-
 .../apache/spark/rdd/OrderedRDDFunctions.scala  |   10 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |   33 +-
 .../spark/rdd/ParallelCollectionRDD.scala       |    8 +-
 .../apache/spark/rdd/PartitionPruningRDD.scala  |   14 +-
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |    3 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   |  120 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |    4 +-
 .../scala/org/apache/spark/rdd/SampledRDD.scala |    5 +-
 .../spark/rdd/SequenceFileRDDFunctions.scala    |   11 +-
 .../org/apache/spark/rdd/ShuffledRDD.scala      |    6 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |    5 +-
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |    7 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |   57 +-
 .../scala/org/apache/spark/rdd/ZippedRDD.scala  |    6 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |  404 ++++-
 .../spark/scheduler/DAGSchedulerEvent.scala     |    5 +-
 .../org/apache/spark/scheduler/JobWaiter.scala  |    1 +
 .../apache/spark/scheduler/SchedulingMode.scala |    2 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |    9 +-
 .../apache/spark/scheduler/SparkListener.scala  |    9 +-
 .../spark/scheduler/SparkListenerBus.scala      |    1 -
 .../org/apache/spark/scheduler/StageInfo.scala  |    1 +
 .../org/apache/spark/scheduler/TaskInfo.scala   |    2 +
 .../apache/spark/scheduler/TaskLocality.scala   |    4 +-
 .../scheduler/cluster/ClusterScheduler.scala    |   13 +-
 .../cluster/ClusterTaskSetManager.scala         |    9 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala |   24 +-
 .../cluster/SimrSchedulerBackend.scala          |    3 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |    2 +-
 .../scheduler/cluster/TaskResultGetter.scala    |    4 +-
 .../mesos/CoarseMesosSchedulerBackend.scala     |    2 +-
 .../spark/scheduler/local/LocalScheduler.scala  |   29 +-
 .../org/apache/spark/storage/BlockManager.scala |    7 +-
 .../spark/storage/BlockManagerMaster.scala      |   16 +-
 .../spark/storage/BlockManagerMasterActor.scala |    7 +-
 .../spark/storage/BlockObjectWriter.scala       |    2 +
 .../spark/storage/ShuffleBlockManager.scala     |    2 +-
 .../org/apache/spark/storage/StorageLevel.scala |    2 +-
 .../spark/storage/StoragePerfTester.scala       |   17 +
 .../apache/spark/storage/ThreadingTest.scala    |    2 +-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  |   23 +-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |    2 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   67 +-
 .../spark/ui/storage/BlockManagerUI.scala       |    2 +-
 .../scala/org/apache/spark/util/AkkaUtils.scala |   79 +-
 .../org/apache/spark/util/AppendOnlyMap.scala   |   93 +-
 .../spark/util/BoundedPriorityQueue.scala       |    2 +
 .../spark/util/IndestructibleActorSystem.scala  |   68 +
 .../org/apache/spark/util/MetadataCleaner.scala |    3 +-
 .../apache/spark/util/TimeStampedHashMap.scala  |    2 +-
 .../scala/org/apache/spark/util/Utils.scala     |   29 +-
 .../org/apache/spark/util/XORShiftRandom.scala  |   94 +
 .../spark/util/collection/OpenHashMap.scala     |    3 +-
 .../spark/util/collection/OpenHashSet.scala     |  118 +-
 .../collection/PrimitiveKeyOpenHashMap.scala    |    7 +-
 .../spark/util/collection/PrimitiveVector.scala |    4 +-
 .../org/apache/spark/AccumulatorSuite.scala     |   32 +-
 .../org/apache/spark/CheckpointSuite.scala      |    7 +-
 .../org/apache/spark/DistributedSuite.scala     |    5 +-
 .../scala/org/apache/spark/DriverSuite.scala    |    2 +-
 .../scala/org/apache/spark/JavaAPISuite.java    |   14 +
 .../org/apache/spark/JobCancellationSuite.scala |    4 +-
 .../org/apache/spark/LocalSparkContext.scala    |    2 +-
 .../apache/spark/MapOutputTrackerSuite.scala    |   14 +-
 .../apache/spark/PartitionPruningRDDSuite.scala |   45 -
 .../org/apache/spark/PartitioningSuite.scala    |   10 +-
 .../SparkContextSchedulerCreationSuite.scala    |  140 ++
 .../scala/org/apache/spark/UnpersistSuite.scala |    2 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |   17 +
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |   26 +
 .../org/apache/spark/rdd/DoubleRDDSuite.scala   |  271 +++
 .../spark/rdd/PartitionPruningRDDSuite.scala    |   86 +
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |    8 +-
 .../spark/scheduler/DAGSchedulerSuite.scala     |   43 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |    7 +-
 .../spark/scheduler/SparkListenerSuite.scala    |    2 +-
 .../cluster/ClusterTaskSetManagerSuite.scala    |    2 +-
 .../cluster/TaskResultGetterSuite.scala         |    4 +-
 .../org/apache/spark/storage/BlockIdSuite.scala |    2 +-
 .../spark/storage/BlockManagerSuite.scala       |    2 +-
 .../spark/storage/DiskBlockManagerSuite.scala   |   31 +-
 .../scala/org/apache/spark/ui/UISuite.scala     |    1 -
 .../apache/spark/util/SizeEstimatorSuite.scala  |   72 +-
 .../apache/spark/util/XORShiftRandomSuite.scala |   76 +
 .../util/collection/OpenHashMapSuite.scala      |   33 +-
 .../util/collection/OpenHashSetSuite.scala      |   37 +-
 .../PrimitiveKeyOpenHashMapSuite.scala          |  119 ++
 .../PrimitiveKeyOpenHashSetSuite.scala          |   90 -
 docs/_config.yml                                |    2 +-
 docs/_layouts/global.html                       |    8 +-
 docs/_plugins/copy_api_dirs.rb                  |    2 +-
 docs/bagel-programming-guide.md                 |    2 +-
 docs/building-with-maven.md                     |    6 +
 docs/cluster-overview.md                        |    2 +-
 docs/configuration.md                           |   59 +-
 docs/hadoop-third-party-distributions.md        |    3 +-
 docs/index.md                                   |    8 +-
 docs/job-scheduling.md                          |    2 +-
 docs/monitoring.md                              |    1 +
 docs/running-on-yarn.md                         |   45 +-
 docs/scala-programming-guide.md                 |    2 +-
 docs/spark-standalone.md                        |    4 +-
 docs/streaming-programming-guide.md             |    8 +-
 docs/tuning.md                                  |    5 +-
 ec2/spark_ec2.py                                |    4 +-
 examples/pom.xml                                |   26 +-
 .../org/apache/spark/examples/JavaLogQuery.java |    2 +-
 .../org/apache/spark/examples/JavaPageRank.java |    3 +-
 .../apache/spark/examples/JavaWordCount.java    |    2 +-
 .../apache/spark/mllib/examples/JavaALS.java    |    1 -
 .../apache/spark/examples/BroadcastTest.scala   |   10 +-
 .../org/apache/spark/examples/LocalALS.scala    |    2 +-
 .../spark/examples/MultiBroadcastTest.scala     |   15 +-
 .../org/apache/spark/examples/SparkTC.scala     |    2 +-
 .../streaming/examples/ActorWordCount.scala     |    9 +-
 .../streaming/examples/MQTTWordCount.scala      |    4 +-
 .../streaming/examples/ZeroMQWordCount.scala    |    8 +-
 mllib/pom.xml                                   |   12 +-
 .../apache/spark/mllib/clustering/KMeans.scala  |   11 +-
 .../spark/mllib/util/MFDataGenerator.scala      |    2 +-
 .../spark/mllib/clustering/JavaKMeansSuite.java |    4 +-
 .../mllib/recommendation/JavaALSSuite.java      |    2 -
 new-yarn/pom.xml                                |  161 ++
 .../spark/deploy/yarn/ApplicationMaster.scala   |  446 +++++
 .../yarn/ApplicationMasterArguments.scala       |   94 +
 .../org/apache/spark/deploy/yarn/Client.scala   |  519 ++++++
 .../spark/deploy/yarn/ClientArguments.scala     |  149 ++
 .../yarn/ClientDistributedCacheManager.scala    |  228 +++
 .../spark/deploy/yarn/WorkerLauncher.scala      |  222 +++
 .../spark/deploy/yarn/WorkerRunnable.scala      |  209 +++
 .../deploy/yarn/YarnAllocationHandler.scala     |  687 +++++++
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |   43 +
 .../cluster/YarnClientClusterScheduler.scala    |   47 +
 .../cluster/YarnClientSchedulerBackend.scala    |  109 ++
 .../cluster/YarnClusterScheduler.scala          |   55 +
 .../ClientDistributedCacheManagerSuite.scala    |  220 +++
 pom.xml                                         |  183 +-
 project/SparkBuild.scala                        |  148 +-
 pyspark                                         |   10 +-
 pyspark2.cmd                                    |    2 +-
 python/epydoc.conf                              |    2 +-
 python/pyspark/accumulators.py                  |    6 +-
 python/pyspark/context.py                       |   71 +-
 python/pyspark/rdd.py                           |  104 +-
 python/pyspark/serializers.py                   |  301 +++-
 python/pyspark/tests.py                         |   18 +-
 python/pyspark/worker.py                        |   44 +-
 python/run-tests                                |    1 +
 python/test_support/userlibrary.py              |   17 +
 repl-bin/pom.xml                                |    8 +-
 repl-bin/src/deb/bin/run                        |    2 +-
 repl-bin/src/deb/bin/spark-executor             |    2 +-
 repl-bin/src/deb/bin/spark-shell                |    2 +-
 repl/lib/scala-jline.jar                        |  Bin 158463 -> 0 bytes
 repl/pom.xml                                    |   18 +-
 .../main/scala/org/apache/spark/repl/Main.scala |    8 +-
 .../org/apache/spark/repl/SparkExprTyper.scala  |  109 ++
 .../org/apache/spark/repl/SparkILoop.scala      |  944 +++++-----
 .../org/apache/spark/repl/SparkILoopInit.scala  |  143 ++
 .../org/apache/spark/repl/SparkIMain.scala      | 1681 ++++++++++--------
 .../org/apache/spark/repl/SparkISettings.scala  |   63 -
 .../org/apache/spark/repl/SparkImports.scala    |  108 +-
 .../spark/repl/SparkJLineCompletion.scala       |  206 ++-
 .../apache/spark/repl/SparkJLineReader.scala    |   65 +-
 .../apache/spark/repl/SparkMemberHandlers.scala |  109 +-
 .../scala/org/apache/spark/repl/ReplSuite.scala |  178 +-
 run-example                                     |    2 +-
 run-example2.cmd                                |    2 +-
 spark-class                                     |    6 +-
 streaming/pom.xml                               |   31 +-
 .../org/apache/spark/streaming/Checkpoint.scala |    8 +-
 .../org/apache/spark/streaming/DStream.scala    |   48 +-
 .../spark/streaming/DStreamCheckpointData.scala |    6 +-
 .../apache/spark/streaming/DStreamGraph.scala   |    1 +
 .../scala/org/apache/spark/streaming/Job.scala  |   41 -
 .../org/apache/spark/streaming/JobManager.scala |   88 -
 .../spark/streaming/NetworkInputTracker.scala   |  174 --
 .../spark/streaming/PairDStreamFunctions.scala  |   63 +-
 .../org/apache/spark/streaming/Scheduler.scala  |  131 --
 .../spark/streaming/StreamingContext.scala      |   70 +-
 .../spark/streaming/api/java/JavaDStream.scala  |    8 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   79 +-
 .../streaming/api/java/JavaPairDStream.scala    |   93 +-
 .../api/java/JavaStreamingContext.scala         |  123 +-
 .../dstream/ConstantInputDStream.scala          |    3 +-
 .../streaming/dstream/FileInputDStream.scala    |   12 +-
 .../streaming/dstream/FilteredDStream.scala     |    3 +-
 .../dstream/FlatMapValuedDStream.scala          |    3 +-
 .../streaming/dstream/FlatMappedDStream.scala   |    3 +-
 .../streaming/dstream/FlumeInputDStream.scala   |    7 +-
 .../streaming/dstream/ForEachDStream.scala      |    6 +-
 .../streaming/dstream/GlommedDStream.scala      |    3 +-
 .../spark/streaming/dstream/InputDStream.scala  |    4 +-
 .../streaming/dstream/KafkaInputDStream.scala   |   23 +-
 .../streaming/dstream/MQTTInputDStream.scala    |    3 +-
 .../dstream/MapPartitionedDStream.scala         |    3 +-
 .../streaming/dstream/MapValuedDStream.scala    |    3 +-
 .../spark/streaming/dstream/MappedDStream.scala |    3 +-
 .../streaming/dstream/NetworkInputDStream.scala |   14 +-
 .../dstream/PluggableInputDStream.scala         |    3 +-
 .../streaming/dstream/QueueInputDStream.scala   |    4 +-
 .../streaming/dstream/RawInputDStream.scala     |    4 +-
 .../dstream/ReducedWindowedDStream.scala        |    9 +-
 .../streaming/dstream/ShuffledDStream.scala     |    3 +-
 .../streaming/dstream/SocketInputDStream.scala  |    6 +-
 .../spark/streaming/dstream/StateDStream.scala  |    4 +-
 .../streaming/dstream/TransformedDStream.scala  |    3 +-
 .../spark/streaming/dstream/UnionDStream.scala  |    5 +-
 .../streaming/dstream/WindowedDStream.scala     |    9 +-
 .../streaming/receivers/ActorReceiver.scala     |   35 +-
 .../streaming/receivers/ZeroMQReceiver.scala    |   13 +-
 .../spark/streaming/scheduler/BatchInfo.scala   |   37 +
 .../apache/spark/streaming/scheduler/Job.scala  |   41 +
 .../streaming/scheduler/JobGenerator.scala      |  131 ++
 .../streaming/scheduler/JobScheduler.scala      |  106 ++
 .../spark/streaming/scheduler/JobSet.scala      |   61 +
 .../scheduler/NetworkInputTracker.scala         |  175 ++
 .../streaming/scheduler/StreamingListener.scala |   75 +
 .../scheduler/StreamingListenerBus.scala        |   81 +
 .../streaming/util/MasterFailureTest.scala      |   45 +-
 .../apache/spark/streaming/JavaAPISuite.java    |   88 +-
 .../apache/spark/streaming/JavaTestUtils.scala  |   22 +-
 .../spark/streaming/BasicOperationsSuite.scala  |   12 -
 .../spark/streaming/CheckpointSuite.scala       |   62 +-
 .../apache/spark/streaming/FailureSuite.scala   |   13 +-
 .../spark/streaming/InputStreamsSuite.scala     |   16 +-
 .../streaming/StreamingListenerSuite.scala      |   71 +
 .../apache/spark/streaming/TestSuiteBase.scala  |   63 +-
 .../spark/streaming/WindowOperationsSuite.scala |   14 +-
 tools/pom.xml                                   |   12 +-
 .../tools/JavaAPICompletenessChecker.scala      |    4 +-
 yarn/pom.xml                                    |   10 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  189 +-
 .../org/apache/spark/deploy/yarn/Client.scala   |  199 ++-
 .../spark/deploy/yarn/ClientArguments.scala     |   41 +-
 .../yarn/ClientDistributedCacheManager.scala    |    4 +-
 .../spark/deploy/yarn/WorkerLauncher.scala      |  243 +++
 .../spark/deploy/yarn/WorkerRunnable.scala      |   98 +-
 .../deploy/yarn/YarnAllocationHandler.scala     |  358 ++--
 .../spark/deploy/yarn/YarnSparkHadoopUtil.scala |    5 +-
 .../cluster/YarnClientClusterScheduler.scala    |   47 +
 .../cluster/YarnClientSchedulerBackend.scala    |  109 ++
 311 files changed, 11134 insertions(+), 4697 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de41c436/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/de41c436/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --cc streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 464ac15,73d9593..ea21ab5
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@@ -17,17 -17,15 +17,15 @@@
  
  package org.apache.spark.streaming.dstream
  
 -import org.apache.spark.rdd.RDD
 -import org.apache.spark.rdd.UnionRDD
 +import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD}
  import org.apache.spark.storage.StorageLevel
 -import org.apache.spark.streaming.{Duration, Interval, Time, DStream}
 +import org.apache.spark.streaming._
- import org.apache.spark._
- import scala.Some
- import scala.Some
- import scala.Some
 +import org.apache.spark.streaming.Duration
  
+ import scala.reflect.ClassTag
+ 
  private[streaming]
- class WindowedDStream[T: ClassManifest](
+ class WindowedDStream[T: ClassTag](
      parent: DStream[T],
      _windowDuration: Duration,
      _slideDuration: Duration)
@@@ -55,15 -51,6 +53,14 @@@
  
    override def compute(validTime: Time): Option[RDD[T]] = {
      val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
 -    Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
 +    val rddsInWindow = parent.slice(currentWindow)
 +    val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
 +      logInfo("Using partition aware union")
 +      new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
 +    } else {
 +      logInfo("Using normal union")
 +      new UnionRDD(ssc.sc,rddsInWindow)
 +    }
 +    Some(windowRDD)
    }
  }
- 


[13/13] git commit: Merge pull request #297 from tdas/window-improvement

Posted by pw...@apache.org.
Merge pull request #297 from tdas/window-improvement

Improvements to DStream window ops and refactoring of Spark's CheckpointSuite

- Added a new RDD - PartitionerAwareUnionRDD. Using this RDD, one can take multiple RDDs partitioned by the same partitioner and unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each will be unified to a single RDD with p partitions and the same partitioner. The preferred location for each partition of the unified RDD will be the most common preferred location of the corresponding partitions of the parent RDDs. For example, location of partition 0 of the unified RDD will be where most of partition 0 of the parent RDDs are located.
- Improved the performance of DStream's reduceByKeyAndWindow and groupByKeyAndWindow. Both these operations work by doing per-batch reduceByKey/groupByKey and then using PartitionerAwareUnionRDD to union the RDDs across the window. This eliminates a shuffle related to the window operation, which can reduce batch processing time by 30-40% for simple workloads.
- Fixed bugs and simplified Spark's CheckpointSuite. Some of the tests were incorrect and unreliable. Added missing tests for ZippedRDD. I can go into greater detail if necessary.
- Added mapSideCombine option to combineByKeyAndWindow.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/588a1695
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/588a1695
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/588a1695

Branch: refs/heads/master
Commit: 588a1695f4b0b7763ecfa8ea56e371783810dd68
Parents: 7bafb68 5fde456
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Jan 2 13:20:54 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Jan 2 13:20:54 2014 -0800

----------------------------------------------------------------------
 .../spark/rdd/PartitionerAwareUnionRDD.scala    | 110 ++++++
 .../apache/spark/rdd/RDDCheckpointData.scala    |   2 +-
 .../org/apache/spark/CheckpointSuite.scala      | 361 +++++++++++--------
 .../scala/org/apache/spark/rdd/RDDSuite.scala   |  27 ++
 .../spark/streaming/PairDStreamFunctions.scala  |  13 +-
 .../streaming/api/java/JavaPairDStream.scala    |  18 +-
 .../streaming/dstream/ShuffledDStream.scala     |   9 +-
 .../streaming/dstream/WindowedDStream.scala     |  16 +-
 .../spark/streaming/WindowOperationsSuite.scala |   4 +-
 9 files changed, 388 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/588a1695/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/588a1695/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/588a1695/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/588a1695/streaming/src/test/scala/org/apache/spark/streaming/WindowOperationsSuite.scala
----------------------------------------------------------------------


[03/13] git commit: Added flag in window operation to use partition awaare union.

Posted by pw...@apache.org.
Added flag in window operation to use partition awaare union.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/03ef6e88
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/03ef6e88
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/03ef6e88

Branch: refs/heads/master
Commit: 03ef6e889929befa968d35fa3757c687edc3a38b
Parents: fd03167
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Nov 21 11:38:56 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Nov 21 11:38:56 2013 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/WindowedDStream.scala     | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/03ef6e88/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
index 49f8431..464ac15 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/WindowedDStream.scala
@@ -41,6 +41,8 @@ class WindowedDStream[T: ClassManifest](
     throw new Exception("The slide duration of WindowedDStream (" + _slideDuration + ") " +
     "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")")
 
+  val useNewUnion = System.getProperty("spark.streaming.useNewUnion", "false").toBoolean
+
   parent.persist(StorageLevel.MEMORY_ONLY_SER)
 
   def windowDuration: Duration =  _windowDuration
@@ -54,7 +56,7 @@ class WindowedDStream[T: ClassManifest](
   override def compute(validTime: Time): Option[RDD[T]] = {
     val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime)
     val rddsInWindow = parent.slice(currentWindow)
-    val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
+    val windowRDD = if (useNewUnion && rddsInWindow.flatMap(_.partitioner).distinct.length == 1) {
       logInfo("Using partition aware union")
       new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
     } else {


[05/13] git commit: Added tests for PartitionerAwareUnionRDD in the CheckpointSuite. Refactored CheckpointSuite to make the tests simpler and more reliable. Added missing test for ZippedRDD.

Posted by pw...@apache.org.
Added tests for PartitionerAwareUnionRDD in the CheckpointSuite. Refactored CheckpointSuite to make the tests simpler and more reliable. Added missing test for ZippedRDD.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/61f4bbda
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/61f4bbda
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/61f4bbda

Branch: refs/heads/master
Commit: 61f4bbda0d4e3ecbd8b955232a741231936a25de
Parents: de41c43
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Dec 20 00:41:47 2013 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Dec 20 00:41:47 2013 -0800

----------------------------------------------------------------------
 .../spark/rdd/PartitionerAwareUnionRDD.scala    |  38 +-
 .../apache/spark/rdd/RDDCheckpointData.scala    |   2 +-
 .../org/apache/spark/CheckpointSuite.scala      | 361 +++++++++++--------
 3 files changed, 231 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
index 96cf93f..995042e 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala
@@ -1,16 +1,29 @@
 package org.apache.spark.rdd
 
 import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition}
+import scala.reflect.ClassTag
+import java.io.{ObjectOutputStream, IOException}
 
 private[spark]
-class PartitionerAwareUnionRDDPartition(val idx: Int, val partitions: Array[Partition])
-  extends Partition {
+class PartitionerAwareUnionRDDPartition(
+    @transient val rdds: Seq[RDD[_]],
+    val idx: Int
+  ) extends Partition {
+  var parents = rdds.map(_.partitions(index)).toArray
+
   override val index = idx
   override def hashCode(): Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent partition at the time of task serialization
+    parents = rdds.map(_.partitions(index)).toArray
+    oos.defaultWriteObject()
+  }
 }
 
 private[spark]
-class PartitionerAwareUnionRDD[T: ClassManifest](
+class PartitionerAwareUnionRDD[T: ClassTag](
     sc: SparkContext,
     var rdds: Seq[RDD[T]]
   ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) {
@@ -21,17 +34,16 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
   override val partitioner = rdds.head.partitioner
 
   override def getPartitions: Array[Partition] = {
-    val numPartitions = rdds.head.partitions.length
+    val numPartitions = partitioner.get.numPartitions
     (0 until numPartitions).map(index => {
-      val parentPartitions = rdds.map(_.partitions(index)).toArray
-      new PartitionerAwareUnionRDDPartition(index, parentPartitions)
+      new PartitionerAwareUnionRDDPartition(rdds, index)
     }).toArray
   }
 
   // Get the location where most of the partitions of parent RDDs are located
   override def getPreferredLocations(s: Partition): Seq[String] = {
     logDebug("Getting preferred locations for " + this)
-    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
     val locations = rdds.zip(parentPartitions).flatMap {
       case (rdd, part) => {
         val parentLocations = currPrefLocs(rdd, part)
@@ -39,7 +51,6 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
         parentLocations
       }
     }
-
     if (locations.isEmpty) {
       Seq.empty
     } else  {
@@ -48,18 +59,19 @@ class PartitionerAwareUnionRDD[T: ClassManifest](
   }
 
   override def compute(s: Partition, context: TaskContext): Iterator[T] = {
-    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].partitions
+    val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents
     rdds.zip(parentPartitions).iterator.flatMap {
       case (rdd, p) => rdd.iterator(p, context)
     }
   }
 
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdds = null
+  }
+
   // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
   private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = {
     rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host)
   }
 }
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
index 3b56e45..fa33a56 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -40,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration {
  * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
  * of the checkpointed RDD.
  */
-private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
+private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T])
   extends Logging with Serializable {
 
   import CheckpointState._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/61f4bbda/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f25d921..81046af 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -57,15 +57,15 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("RDDs with one-to-one dependencies") {
-    testCheckpointing(_.map(x => x.toString))
-    testCheckpointing(_.flatMap(x => 1 to x))
-    testCheckpointing(_.filter(_ % 2 == 0))
-    testCheckpointing(_.sample(false, 0.5, 0))
-    testCheckpointing(_.glom())
-    testCheckpointing(_.mapPartitions(_.map(_.toString)))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
-    testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
-    testCheckpointing(_.pipe(Seq("cat")))
+    testRDD(_.map(x => x.toString))
+    testRDD(_.flatMap(x => 1 to x))
+    testRDD(_.filter(_ % 2 == 0))
+    testRDD(_.sample(false, 0.5, 0))
+    testRDD(_.glom())
+    testRDD(_.mapPartitions(_.map(_.toString)))
+    testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+    testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+    testRDD(_.pipe(Seq("cat")))
   }
 
   test("ParallelCollection") {
@@ -97,7 +97,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   test("ShuffledRDD") {
-    testCheckpointing(rdd => {
+    testRDD(rdd => {
       // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
       new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
     })
@@ -105,25 +105,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("UnionRDD") {
     def otherRDD = sc.makeRDD(1 to 10, 1)
-
-    // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
-    // Current implementation of UnionRDD has transient reference to parent RDDs,
-    // so only the partitions will reduce in serialized size, not the RDD.
-    testCheckpointing(_.union(otherRDD), false, true)
-    testParentCheckpointing(_.union(otherRDD), false, true)
+    testRDD(_.union(otherRDD))
+    testRDDPartitions(_.union(otherRDD))
   }
 
   test("CartesianRDD") {
     def otherRDD = sc.makeRDD(1 to 10, 1)
-    testCheckpointing(new CartesianRDD(sc, _, otherRDD))
-
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+    testRDD(new CartesianRDD(sc, _, otherRDD))
+    testRDDPartitions(new CartesianRDD(sc, _, otherRDD))
 
     // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+    // the parent RDD has been checkpointed and parent partitions have been changed.
     // Note that this test is very specific to the current implementation of CartesianRDD.
     val ones = sc.makeRDD(1 to 100, 10).map(x => x)
     ones.checkpoint() // checkpoint that MappedRDD
@@ -134,23 +126,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val splitAfterCheckpoint =
       serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
     assert(
-      (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
-        (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
-      "CartesianRDD.parents not updated after parent RDD checkpointed"
+      (splitAfterCheckpoint.s1.getClass != splitBeforeCheckpoint.s1.getClass) &&
+        (splitAfterCheckpoint.s2.getClass != splitBeforeCheckpoint.s2.getClass),
+      "CartesianRDD.s1 and CartesianRDD.s2 not updated after parent RDD is checkpointed"
     )
   }
 
   test("CoalescedRDD") {
-    testCheckpointing(_.coalesce(2))
+    testRDD(_.coalesce(2))
+    testRDDPartitions(_.coalesce(2))
 
-    // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(_.coalesce(2), true, false)
-
-    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
-    // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
-    // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
+    // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents)
+    // after the parent RDD has been checkpointed and parent partitions have been changed.
+    // Note that this test is very specific to the current implementation of
+    // CoalescedRDDPartitions.
     val ones = sc.makeRDD(1 to 100, 10).map(x => x)
     ones.checkpoint() // checkpoint that MappedRDD
     val coalesced = new CoalescedRDD(ones, 2)
@@ -160,33 +149,78 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val splitAfterCheckpoint =
       serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
     assert(
-      splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
-      "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
+      splitAfterCheckpoint.parents.head.getClass != splitBeforeCheckpoint.parents.head.getClass,
+      "CoalescedRDDPartition.parents not updated after parent RDD is checkpointed"
     )
   }
 
   test("CoGroupedRDD") {
-    val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
-    testCheckpointing(rdd => {
+    val longLineageRDD1 = generateFatPairRDD()
+    testRDD(rdd => {
       CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
-    }, false, true)
+    })
 
-    val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
-    testParentCheckpointing(rdd => {
+    val longLineageRDD2 = generateFatPairRDD()
+    testRDDPartitions(rdd => {
       CheckpointSuite.cogroup(
         longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
-    }, false, true)
+    })
   }
 
   test("ZippedRDD") {
-    testCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
-
-    // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
-    // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
-    // so only the RDD will reduce in serialized size, not the partitions.
-    testParentCheckpointing(
-      rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
+    testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+    testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)))
+
+    // Test that the ZippedPartition updates parent partitions
+    // after the parent RDD has been checkpointed and parent partitions have been changed.
+    // Note that this test is very specific to the current implementation of ZippedRDD.
+    val rdd = generateFatRDD()
+    val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x))
+    zippedRDD.rdd1.checkpoint()
+    zippedRDD.rdd2.checkpoint()
+    val partitionBeforeCheckpoint =
+      serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+    zippedRDD.count()
+    val partitionAfterCheckpoint =
+      serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]])
+    assert(
+      partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass &&
+        partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass,
+      "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed"
+    )
+  }
+
+  test("PartitionerAwareUnionRDD") {
+    testRDD(rdd => {
+      new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+        generateFatPairRDD(),
+        rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+      ))
+    })
+
+    testRDDPartitions(rdd => {
+      new PartitionerAwareUnionRDD[(Int, Int)](sc, Array(
+        generateFatPairRDD(),
+        rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+      ))
+    })
+
+    // Test that the PartitionerAwareUnionRDD updates parent partitions
+    // (PartitionerAwareUnionRDD.parents) after the parent RDD has been checkpointed and parent
+    // partitions have been changed. Note that this test is very specific to the current
+    // implementation of PartitionerAwareUnionRDD.
+    val pairRDD = generateFatPairRDD()
+    pairRDD.checkpoint()
+    val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD))
+    val partitionBeforeCheckpoint =  serializeDeserialize(
+      unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+    pairRDD.count()
+    val partitionAfterCheckpoint =  serializeDeserialize(
+      unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition])
+    assert(
+      partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass,
+      "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed"
+    )
   }
 
   test("CheckpointRDD with zero partitions") {
@@ -200,29 +234,32 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
   }
 
   /**
-   * Test checkpointing of the final RDD generated by the given operation. By default,
-   * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
-   * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
-   * not, but this is not done by default as usually the partitions do not refer to any RDD and
-   * therefore never store the lineage.
+   * Test checkpointing of the RDD generated by the given operation. It tests whether the
+   * serialized size of the RDD is reduce after checkpointing or not. This function should be called
+   * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
    */
-  def testCheckpointing[U: ClassTag](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean = true,
-      testRDDPartitionSize: Boolean = false
-    ) {
+  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
     // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
+    val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
     val parentRDD = operatedRDD.dependencies.headOption.orNull
     val rddType = operatedRDD.getClass.getSimpleName
     val numPartitions = operatedRDD.partitions.length
 
+    // Force initialization of all the data structures in RDDs
+    // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+    initializeRdd(operatedRDD)
+
+    val partitionsBeforeCheckpoint = operatedRDD.partitions
+
     // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     operatedRDD.checkpoint()
     val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the checkpoint file has been created
     assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
@@ -230,6 +267,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     // Test whether dependencies have been changed from its earlier parent RDD
     assert(operatedRDD.dependencies.head.rdd != parentRDD)
 
+    // Test whether the partitions have been changed from its earlier partitions
+    assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList)
+
     // Test whether the partitions have been changed to the new Hadoop partitions
     assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
 
@@ -239,122 +279,72 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     // Test whether the data in the checkpointed RDD is same as original
     assert(operatedRDD.collect() === result)
 
-    // Test whether serialized size of the RDD has reduced. If the RDD
-    // does not have any dependency to another RDD (e.g., ParallelCollection,
-    // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
-    if (testRDDSize) {
-      logInfo("Size of " + rddType +
-        "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing " +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
+    // Test whether serialized size of the RDD has reduced.
+    logInfo("Size of " + rddType +
+      " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+    assert(
+      rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+      "Size of " + rddType + " did not reduce after checkpointing " +
+        " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+    )
 
-    // Test whether serialized size of the partitions has reduced. If the partitions
-    // do not have any non-transient reference to another RDD or another RDD's partitions, it
-    // does not refer to a lineage and therefore may not reduce in size after checkpointing.
-    // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
-    // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
-    // replaced with the HadooPartitions of the checkpointed RDD.
-    if (testRDDPartitionSize) {
-      logInfo("Size of " + rddType + " partitions "
-        + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing " +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
   }
 
   /**
    * Test whether checkpointing of the parent of the generated RDD also
    * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
    * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
-   * this RDD will remember the partitions and therefore potentially the whole lineage.
+   * the generated RDD will remember the partitions and therefore potentially the whole lineage.
+   * This function should be called only those RDD whose partitions refer to parent RDD's
+   * partitions (i.e., do not call it on simple RDD like MappedRDD).
+   *
    */
-  def testParentCheckpointing[U: ClassTag](
-      op: (RDD[Int]) => RDD[U],
-      testRDDSize: Boolean,
-      testRDDPartitionSize: Boolean
-    ) {
+  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
     // Generate the final RDD using given RDD operation
-    val baseRDD = generateLongLineageRDD()
+    val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
-    val parentRDD = operatedRDD.dependencies.head.rdd
+    val parentRDDs = operatedRDD.dependencies.map(_.rdd)
     val rddType = operatedRDD.getClass.getSimpleName
-    val parentRDDType = parentRDD.getClass.getSimpleName
 
-    // Get the partitions and dependencies of the parent in case they're lazily computed
-    parentRDD.dependencies
-    parentRDD.partitions
+    // Force initialization of all the data structures in RDDs
+    // Without this, serializing the RDD will give a wrong estimate of the size of the RDD
+    initializeRdd(operatedRDD)
 
     // Find serialized sizes before and after the checkpoint
-    val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
-    parentRDD.checkpoint()  // checkpoint the parent RDD, not the generated one
-    val result = operatedRDD.collect()
-    val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
+    val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+    parentRDDs.foreach(_.checkpoint())  // checkpoint the parent RDD, not the generated one
+    val result = operatedRDD.collect()  // force checkpointing
+    operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
+    val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+    logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the data in the checkpointed RDD is same as original
     assert(operatedRDD.collect() === result)
 
-    // Test whether serialized size of the RDD has reduced because of its parent being
-    // checkpointed. If this RDD or its parent RDD do not have any dependency
-    // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
-    // not reduce in size after checkpointing.
-    if (testRDDSize) {
-      assert(
-        rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
-        "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType +
-          "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
-      )
-    }
-
-    // Test whether serialized size of the partitions has reduced because of its parent being
-    // checkpointed. If the partitions do not have any non-transient reference to another RDD
-    // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
-    // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
-    // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
-    // partitions must have changed after checkpointing.
-    if (testRDDPartitionSize) {
-      assert(
-        splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
-        "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
-          "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
-      )
-    }
-
+    // Test whether serialized size of the partitions has reduced
+    logInfo("Size of partitions of " + rddType +
+      " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]")
+    assert(
+      partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint,
+      "Size of " + rddType + " partitions did not reduce after checkpointing parent RDDs" +
+        " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]"
+    )
   }
 
   /**
-   * Generate an RDD with a long lineage of one-to-one dependencies.
+   * Generate an RDD such that both the RDD and its partitions have large size.
    */
-  def generateLongLineageRDD(): RDD[Int] = {
-    var rdd = sc.makeRDD(1 to 100, 4)
-    for (i <- 1 to 50) {
-      rdd = rdd.map(x => x + 1)
-    }
-    rdd
+  def generateFatRDD(): RDD[Int] = {
+    new FatRDD(sc.makeRDD(1 to 100, 4)).map(x => x)
   }
 
   /**
-   * Generate an RDD with a long lineage specifically for CoGroupedRDD.
-   * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
-   * and narrow dependency with this RDD. This method generate such an RDD by a sequence
-   * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+   * Generate an pair RDD (with partitioner) such that both the RDD and its partitions
+   * have large size.
    */
-  def generateLongLineageRDDForCoGroupedRDD() = {
-    val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
-
-    def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
-
-    var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
-    for(i <- 1 to 10) {
-      cogrouped = cogrouped.mapValues(add).cogroup(ones)
-    }
-    cogrouped.mapValues(add)
+  def generateFatPairRDD() = {
+    new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
   }
 
   /**
@@ -362,8 +352,26 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
    * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
    */
   def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
-    (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
-     Utils.serialize(rdd.partitions).length)
+    val rddSize = Utils.serialize(rdd).size
+    val rddCpDataSize = Utils.serialize(rdd.checkpointData).size
+    val rddPartitionSize = Utils.serialize(rdd.partitions).size
+    val rddDependenciesSize = Utils.serialize(rdd.dependencies).size
+
+    // Print detailed size, helps in debugging
+    logInfo("Serialized sizes of " + rdd +
+      ": RDD = " + rddSize +
+      ", RDD checkpoint data = " + rddCpDataSize +
+      ", RDD partitions = " + rddPartitionSize +
+      ", RDD dependencies = " + rddDependenciesSize
+    )
+    // this makes sure that serializing the RDD's checkpoint data does not
+    // serialize the whole RDD as well
+    assert(
+      rddSize > rddCpDataSize,
+      "RDD's checkpoint data (" + rddCpDataSize  + ") is equal or larger than the " +
+        "whole RDD with checkpoint data (" + rddSize + ")"
+    )
+    (rddSize - rddCpDataSize, rddPartitionSize)
   }
 
   /**
@@ -375,8 +383,49 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     val bytes = Utils.serialize(obj)
     Utils.deserialize[T](bytes)
   }
+
+  /**
+   * Recursively force the initialization of the all members of an RDD and it parents.
+   */
+  def initializeRdd(rdd: RDD[_]) {
+    rdd.partitions // forces the
+    rdd.dependencies.map(_.rdd).foreach(initializeRdd(_))
+  }
 }
 
+/** RDD partition that has large serialized size. */
+class FatPartition(val partition: Partition) extends Partition {
+  val bigData = new Array[Byte](10000)
+  def index: Int = partition.index
+}
+
+/** RDD that has large serialized size. */
+class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) {
+  val bigData = new Array[Byte](100000)
+
+  protected def getPartitions: Array[Partition] = {
+    parent.partitions.map(p => new FatPartition(p))
+  }
+
+  def compute(split: Partition, context: TaskContext): Iterator[Int] = {
+    parent.compute(split.asInstanceOf[FatPartition].partition, context)
+  }
+}
+
+/** Pair RDD that has large serialized size. */
+class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) {
+  val bigData = new Array[Byte](100000)
+
+  protected def getPartitions: Array[Partition] = {
+    parent.partitions.map(p => new FatPartition(p))
+  }
+
+  @transient override val partitioner = Some(_partitioner)
+
+  def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = {
+    parent.compute(split.asInstanceOf[FatPartition].partition, context).map(x => (x, x))
+  }
+}
 
 object CheckpointSuite {
   // This is a custom cogroup function that does not use mapValues like