You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/25 09:32:36 UTC

git commit: SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup

Repository: spark
Updated Branches:
  refs/heads/master 2f75a4a30 -> 8529ced35


SPARK-2657 Use more compact data structures than ArrayBuffer in groupBy & cogroup

JIRA: https://issues.apache.org/jira/browse/SPARK-2657

Our current code uses ArrayBuffers for each group of values in groupBy, as well as for the key's elements in CoGroupedRDD. ArrayBuffers have a lot of overhead if there are few values in them, which is likely to happen in cases such as join. In particular, they have a pointer to an Object[] of size 16 by default, which is 24 bytes for the array header + 128 for the pointers in there, plus at least 32 for the ArrayBuffer data structure. This patch replaces the per-group buffers with a CompactBuffer class that can store up to 2 elements more efficiently (in fields of itself) and acts like an ArrayBuffer beyond that. For a key's elements in CoGroupedRDD, we use an Array of CompactBuffers instead of an ArrayBuffer of ArrayBuffers.

There are some changes throughout the code to deal with CoGroupedRDD returning Array instead. We can also decide not to do that but CoGroupedRDD is a `DeveloperAPI` so I think it's okay to change it here.

Author: Matei Zaharia <ma...@databricks.com>

Closes #1555 from mateiz/compact-groupby and squashes the following commits:

845a356 [Matei Zaharia] Lower initial size of CompactBuffer's vector to 8
07621a7 [Matei Zaharia] Review comments
0c1cd12 [Matei Zaharia] Don't use varargs in CompactBuffer.apply
bdc8a39 [Matei Zaharia] Small tweak to +=, and typos
f61f040 [Matei Zaharia] Fix line lengths
59da88b0 [Matei Zaharia] Fix line lengths
197cde8 [Matei Zaharia] Make CompactBuffer extend Seq to make its toSeq more efficient
775110f [Matei Zaharia] Change CoGroupedRDD to give (K, Array[Iterable[_]]) to avoid wrappers
9b4c6e8 [Matei Zaharia] Use CompactBuffer in CoGroupedRDD
ed577ab [Matei Zaharia] Use CompactBuffer in groupByKey
10f0de1 [Matei Zaharia] A CompactBuffer that's more memory-efficient than ArrayBuffer for small buffers


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8529ced3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8529ced3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8529ced3

Branch: refs/heads/master
Commit: 8529ced35c6b77a384d10a26b654a8073d57e03d
Parents: 2f75a4a
Author: Matei Zaharia <ma...@databricks.com>
Authored: Fri Jul 25 00:32:32 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Fri Jul 25 00:32:32 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |  16 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala |  33 ++--
 .../spark/serializer/KryoSerializer.scala       |   2 +
 .../spark/util/collection/CompactBuffer.scala   | 159 +++++++++++++++++++
 .../org/apache/spark/CheckpointSuite.scala      |  37 +++--
 .../scala/org/apache/spark/ShuffleSuite.scala   |   4 +-
 .../util/collection/CompactBufferSuite.scala    | 105 ++++++++++++
 .../stat/correlation/SpearmanCorrelation.scala  |   6 +-
 .../dstream/ReducedWindowedDStream.scala        |  15 +-
 9 files changed, 334 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index aca235a..7d96089 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -25,7 +25,7 @@ import scala.language.existentials
 import org.apache.spark.{InterruptibleIterator, Partition, Partitioner, SparkEnv, TaskContext}
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap}
+import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, CompactBuffer}
 import org.apache.spark.serializer.Serializer
 import org.apache.spark.shuffle.ShuffleHandle
 
@@ -66,14 +66,14 @@ private[spark] class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]
  */
 @DeveloperApi
 class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
-  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+  extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {
 
   // For example, `(k, a) cogroup (k, b)` produces k -> Seq(ArrayBuffer as, ArrayBuffer bs).
   // Each ArrayBuffer is represented as a CoGroup, and the resulting Seq as a CoGroupCombiner.
   // CoGroupValue is the intermediate state of each value before being merged in compute.
-  private type CoGroup = ArrayBuffer[Any]
+  private type CoGroup = CompactBuffer[Any]
   private type CoGroupValue = (Any, Int)  // Int is dependency number
-  private type CoGroupCombiner = Seq[CoGroup]
+  private type CoGroupCombiner = Array[CoGroup]
 
   private var serializer: Option[Serializer] = None
 
@@ -114,7 +114,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
 
   override val partitioner: Some[Partitioner] = Some(part)
 
-  override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
+  override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
     val sparkConf = SparkEnv.get.conf
     val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
@@ -150,7 +150,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
           getCombiner(kv._1)(depNum) += kv._2
         }
       }
-      new InterruptibleIterator(context, map.iterator)
+      new InterruptibleIterator(context,
+        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     } else {
       val map = createExternalMap(numRdds)
       rddIterators.foreach { case (it, depNum) =>
@@ -161,7 +162,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
       }
       context.taskMetrics.memoryBytesSpilled = map.memoryBytesSpilled
       context.taskMetrics.diskBytesSpilled = map.diskBytesSpilled
-      new InterruptibleIterator(context, map.iterator)
+      new InterruptibleIterator(context,
+        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index a6b9204..c04d162 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -46,6 +46,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
 import org.apache.spark.SparkContext._
 import org.apache.spark.partial.{BoundedDouble, PartialResult}
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.collection.CompactBuffer
 
 /**
  * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
@@ -361,12 +362,12 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
     // groupByKey shouldn't use map side combine because map side combine does not
     // reduce the amount of data shuffled and requires all map side data be inserted
     // into a hash table, leading to more objects in the old gen.
-    val createCombiner = (v: V) => ArrayBuffer(v)
-    val mergeValue = (buf: ArrayBuffer[V], v: V) => buf += v
-    val mergeCombiners = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => c1 ++ c2
-    val bufs = combineByKey[ArrayBuffer[V]](
+    val createCombiner = (v: V) => CompactBuffer(v)
+    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
+    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
+    val bufs = combineByKey[CompactBuffer[V]](
       createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false)
-    bufs.mapValues(_.toIterable)
+    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
   }
 
   /**
@@ -571,11 +572,11 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
-    cg.mapValues { case Seq(vs, w1s, w2s, w3s) =>
-       (vs.asInstanceOf[Seq[V]],
-         w1s.asInstanceOf[Seq[W1]],
-         w2s.asInstanceOf[Seq[W2]],
-         w3s.asInstanceOf[Seq[W3]])
+    cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
+       (vs.asInstanceOf[Iterable[V]],
+         w1s.asInstanceOf[Iterable[W1]],
+         w2s.asInstanceOf[Iterable[W2]],
+         w3s.asInstanceOf[Iterable[W3]])
     }
   }
 
@@ -589,8 +590,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
-    cg.mapValues { case Seq(vs, w1s) =>
-      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W]])
+    cg.mapValues { case Array(vs, w1s) =>
+      (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
     }
   }
 
@@ -604,10 +605,10 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
       throw new SparkException("Default partitioner cannot partition array keys.")
     }
     val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
-    cg.mapValues { case Seq(vs, w1s, w2s) =>
-      (vs.asInstanceOf[Seq[V]],
-        w1s.asInstanceOf[Seq[W1]],
-        w2s.asInstanceOf[Seq[W2]])
+    cg.mapValues { case Array(vs, w1s, w2s) =>
+      (vs.asInstanceOf[Iterable[V]],
+        w1s.asInstanceOf[Iterable[W1]],
+        w2s.asInstanceOf[Iterable[W2]])
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index c3a3e90..fa79b25 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage._
 import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock}
 import org.apache.spark.util.BoundedPriorityQueue
+import org.apache.spark.util.collection.CompactBuffer
 
 import scala.reflect.ClassTag
 
@@ -185,6 +186,7 @@ private[serializer] object KryoSerializer {
     classOf[GotBlock],
     classOf[GetBlock],
     classOf[MapStatus],
+    classOf[CompactBuffer[_]],
     classOf[BlockManagerId],
     classOf[Array[Byte]],
     classOf[BoundedPriorityQueue[_]],

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
new file mode 100644
index 0000000..d44e15e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/util/collection/CompactBuffer.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+/**
+ * An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
+ * ArrayBuffer always allocates an Object array to store the data, with 16 entries by default,
+ * so it has about 80-100 bytes of overhead. In contrast, CompactBuffer can keep up to two
+ * elements in fields of the main object, and only allocates an Array[AnyRef] if there are more
+ * entries than that. This makes it more efficient for operations like groupBy where we expect
+ * some keys to have very few elements.
+ */
+private[spark] class CompactBuffer[T] extends Seq[T] with Serializable {
+  // First two elements
+  private var element0: T = _
+  private var element1: T = _
+
+  // Number of elements, including our two in the main object
+  private var curSize = 0
+
+  // Array for extra elements
+  private var otherElements: Array[AnyRef] = null
+
+  def apply(position: Int): T = {
+    if (position < 0 || position >= curSize) {
+      throw new IndexOutOfBoundsException
+    }
+    if (position == 0) {
+      element0
+    } else if (position == 1) {
+      element1
+    } else {
+      otherElements(position - 2).asInstanceOf[T]
+    }
+  }
+
+  private def update(position: Int, value: T): Unit = {
+    if (position < 0 || position >= curSize) {
+      throw new IndexOutOfBoundsException
+    }
+    if (position == 0) {
+      element0 = value
+    } else if (position == 1) {
+      element1 = value
+    } else {
+      otherElements(position - 2) = value.asInstanceOf[AnyRef]
+    }
+  }
+
+  def += (value: T): CompactBuffer[T] = {
+    val newIndex = curSize
+    if (newIndex == 0) {
+      element0 = value
+      curSize = 1
+    } else if (newIndex == 1) {
+      element1 = value
+      curSize = 2
+    } else {
+      growToSize(curSize + 1)
+      otherElements(newIndex - 2) = value.asInstanceOf[AnyRef]
+    }
+    this
+  }
+
+  def ++= (values: TraversableOnce[T]): CompactBuffer[T] = {
+    values match {
+      // Optimize merging of CompactBuffers, used in cogroup and groupByKey
+      case compactBuf: CompactBuffer[T] =>
+        val oldSize = curSize
+        // Copy the other buffer's size and elements to local variables in case it is equal to us
+        val itsSize = compactBuf.curSize
+        val itsElements = compactBuf.otherElements
+        growToSize(curSize + itsSize)
+        if (itsSize == 1) {
+          this(oldSize) = compactBuf.element0
+        } else if (itsSize == 2) {
+          this(oldSize) = compactBuf.element0
+          this(oldSize + 1) = compactBuf.element1
+        } else if (itsSize > 2) {
+          this(oldSize) = compactBuf.element0
+          this(oldSize + 1) = compactBuf.element1
+          // At this point our size is also above 2, so just copy its array directly into ours.
+          // Note that since we added two elements above, the index in this.otherElements that we
+          // should copy to is oldSize.
+          System.arraycopy(itsElements, 0, otherElements, oldSize, itsSize - 2)
+        }
+
+      case _ =>
+        values.foreach(e => this += e)
+    }
+    this
+  }
+
+  override def length: Int = curSize
+
+  override def size: Int = curSize
+
+  override def iterator: Iterator[T] = new Iterator[T] {
+    private var pos = 0
+    override def hasNext: Boolean = pos < curSize
+    override def next(): T = {
+      if (!hasNext) {
+        throw new NoSuchElementException
+      }
+      pos += 1
+      apply(pos - 1)
+    }
+  }
+
+  /** Increase our size to newSize and grow the backing array if needed. */
+  private def growToSize(newSize: Int): Unit = {
+    if (newSize < 0) {
+      throw new UnsupportedOperationException("Can't grow buffer past Int.MaxValue elements")
+    }
+    val capacity = if (otherElements != null) otherElements.length + 2 else 2
+    if (newSize > capacity) {
+      var newArrayLen = 8
+      while (newSize - 2 > newArrayLen) {
+        newArrayLen *= 2
+        if (newArrayLen == Int.MinValue) {
+          // Prevent overflow if we double from 2^30 to 2^31, which will become Int.MinValue.
+          // Note that we set the new array length to Int.MaxValue - 2 so that our capacity
+          // calculation above still gives a positive integer.
+          newArrayLen = Int.MaxValue - 2
+        }
+      }
+      val newArray = new Array[AnyRef](newArrayLen)
+      if (otherElements != null) {
+        System.arraycopy(otherElements, 0, newArray, 0, otherElements.length)
+      }
+      otherElements = newArray
+    }
+    curSize = newSize
+  }
+}
+
+private[spark] object CompactBuffer {
+  def apply[T](): CompactBuffer[T] = new CompactBuffer[T]
+
+  def apply[T](value: T): CompactBuffer[T] = {
+    val buf = new CompactBuffer[T]
+    buf += value
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index fc00458..d1cb2d9 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -156,15 +156,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
 
   test("CoGroupedRDD") {
     val longLineageRDD1 = generateFatPairRDD()
+
+    // Collect the RDD as sequences instead of arrays to enable equality tests in testRDD
+    val seqCollectFunc = (rdd: RDD[(Int, Array[Iterable[Int]])]) =>
+      rdd.map{case (p, a) => (p, a.toSeq)}.collect(): Any
+
     testRDD(rdd => {
       CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner)
-    })
+    }, seqCollectFunc)
 
     val longLineageRDD2 = generateFatPairRDD()
     testRDDPartitions(rdd => {
       CheckpointSuite.cogroup(
         longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
-    })
+    }, seqCollectFunc)
   }
 
   test("ZippedPartitionsRDD") {
@@ -235,12 +240,19 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     assert(rdd.partitions.size === 0)
   }
 
+  def defaultCollectFunc[T](rdd: RDD[T]): Any = rdd.collect()
+
   /**
    * Test checkpointing of the RDD generated by the given operation. It tests whether the
    * serialized size of the RDD is reduce after checkpointing or not. This function should be called
    * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.).
+   *
+   * @param op an operation to run on the RDD
+   * @param collectFunc a function for collecting the values in the RDD, in case there are
+   *   non-comparable types like arrays that we want to convert to something that supports ==
    */
-  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+  def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U],
+      collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
     // Generate the final RDD using given RDD operation
     val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
@@ -258,13 +270,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
     val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     operatedRDD.checkpoint()
-    val result = operatedRDD.collect()
+    val result = collectFunc(operatedRDD)
     operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
     val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the checkpoint file has been created
-    assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+    assert(collectFunc(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
 
     // Test whether dependencies have been changed from its earlier parent RDD
     assert(operatedRDD.dependencies.head.rdd != parentRDD)
@@ -279,7 +291,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     assert(operatedRDD.partitions.length === numPartitions)
 
     // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
+    assert(collectFunc(operatedRDD) === result)
 
     // Test whether serialized size of the RDD has reduced.
     logInfo("Size of " + rddType +
@@ -289,7 +301,6 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
       "Size of " + rddType + " did not reduce after checkpointing " +
         " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
     )
-
   }
 
   /**
@@ -300,8 +311,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
    * This function should be called only those RDD whose partitions refer to parent RDD's
    * partitions (i.e., do not call it on simple RDD like MappedRDD).
    *
+   * @param op an operation to run on the RDD
+   * @param collectFunc a function for collecting the values in the RDD, in case there are
+   *   non-comparable types like arrays that we want to convert to something that supports ==
    */
-  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) {
+  def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U],
+       collectFunc: RDD[U] => Any = defaultCollectFunc[U] _) {
     // Generate the final RDD using given RDD operation
     val baseRDD = generateFatRDD()
     val operatedRDD = op(baseRDD)
@@ -316,13 +331,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
     val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
     parentRDDs.foreach(_.checkpoint())  // checkpoint the parent RDD, not the generated one
-    val result = operatedRDD.collect()  // force checkpointing
+    val result = collectFunc(operatedRDD)  // force checkpointing
     operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables
     val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
     logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString)
 
     // Test whether the data in the checkpointed RDD is same as original
-    assert(operatedRDD.collect() === result)
+    assert(collectFunc(operatedRDD) === result)
 
     // Test whether serialized size of the partitions has reduced
     logInfo("Size of partitions of " + rddType +
@@ -436,7 +451,7 @@ object CheckpointSuite {
     new CoGroupedRDD[K](
       Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
       part
-    ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+    ).asInstanceOf[RDD[(K, Array[Iterable[V]])]]
   }
 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 237e644..eae67c7 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -176,7 +176,9 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
     val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
     val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
     val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
-    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+    val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2))
+      .map(p => (p._1, p._2.map(_.toArray)))
+      .collectAsMap()
 
     assert(results(1)(0).length === 3)
     assert(results(1)(0).contains(1))

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
new file mode 100644
index 0000000..6c956d9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/util/collection/CompactBufferSuite.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection
+
+import org.scalatest.FunSuite
+
+class CompactBufferSuite extends FunSuite {
+  test("empty buffer") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    intercept[IndexOutOfBoundsException] { b(0) }
+    intercept[IndexOutOfBoundsException] { b(1) }
+    intercept[IndexOutOfBoundsException] { b(2) }
+    intercept[IndexOutOfBoundsException] { b(-1) }
+  }
+
+  test("basic inserts") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    for (i <- 0 until 1000) {
+      b += i
+      assert(b.size === i + 1)
+      assert(b(i) === i)
+    }
+    assert(b.iterator.toList === (0 until 1000).toList)
+    assert(b.iterator.toList === (0 until 1000).toList)
+    assert(b.size === 1000)
+  }
+
+  test("adding sequences") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+
+    // Add some simple lists and iterators
+    b ++= List(0)
+    assert(b.size === 1)
+    assert(b.iterator.toList === List(0))
+    b ++= Iterator(1)
+    assert(b.size === 2)
+    assert(b.iterator.toList === List(0, 1))
+    b ++= List(2)
+    assert(b.size === 3)
+    assert(b.iterator.toList === List(0, 1, 2))
+    b ++= Iterator(3, 4, 5, 6, 7, 8, 9)
+    assert(b.size === 10)
+    assert(b.iterator.toList === (0 until 10).toList)
+
+    // Add CompactBuffers
+    val b2 = new CompactBuffer[Int]
+    b2 ++= 0 until 10
+    b ++= b2
+    assert(b.iterator.toList === (1 to 2).flatMap(i => 0 until 10).toList)
+    b ++= b2
+    assert(b.iterator.toList === (1 to 3).flatMap(i => 0 until 10).toList)
+    b ++= b2
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+
+    // Add some small CompactBuffers as well
+    val b3 = new CompactBuffer[Int]
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList)
+    b3 += 0
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0))
+    b3 += 1
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1))
+    b3 += 2
+    b ++= b3
+    assert(b.iterator.toList === (1 to 4).flatMap(i => 0 until 10).toList ++ List(0, 0, 1, 0, 1, 2))
+  }
+
+  test("adding the same buffer to itself") {
+    val b = new CompactBuffer[Int]
+    assert(b.size === 0)
+    assert(b.iterator.toList === Nil)
+    b += 1
+    assert(b.toList === List(1))
+    for (j <- 1 until 8) {
+      b ++= b
+      assert(b.size === (1 << j))
+      assert(b.iterator.toList === (1 to (1 << j)).map(i => 1).toList)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 88de2c8..1f7de63 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -122,6 +122,10 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
   private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
     val partitioner = new HashPartitioner(input.partitions.size)
     val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
-    cogrouped.map { case (_, values: Seq[Seq[Double]]) => new DenseVector(values.flatten.toArray) }
+    cogrouped.map {
+      case (_, values: Array[Iterable[_]]) =>
+        val doubles = values.asInstanceOf[Array[Iterable[Double]]]
+        new DenseVector(doubles.flatten.toArray)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8529ced3/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
index 40da313..1a47089 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReducedWindowedDStream.scala
@@ -133,17 +133,17 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
     val numOldValues = oldRDDs.size
     val numNewValues = newRDDs.size
 
-    val mergeValues = (seqOfValues: Seq[Seq[V]]) => {
-      if (seqOfValues.size != 1 + numOldValues + numNewValues) {
+    val mergeValues = (arrayOfValues: Array[Iterable[V]]) => {
+      if (arrayOfValues.size != 1 + numOldValues + numNewValues) {
         throw new Exception("Unexpected number of sequences of reduced values")
       }
       // Getting reduced values "old time steps" that will be removed from current window
-      val oldValues = (1 to numOldValues).map(i => seqOfValues(i)).filter(!_.isEmpty).map(_.head)
+      val oldValues = (1 to numOldValues).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head)
       // Getting reduced values "new time steps"
       val newValues =
-        (1 to numNewValues).map(i => seqOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
+        (1 to numNewValues).map(i => arrayOfValues(numOldValues + i)).filter(!_.isEmpty).map(_.head)
 
-      if (seqOfValues(0).isEmpty) {
+      if (arrayOfValues(0).isEmpty) {
         // If previous window's reduce value does not exist, then at least new values should exist
         if (newValues.isEmpty) {
           throw new Exception("Neither previous window has value for key, nor new values found. " +
@@ -153,7 +153,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
         newValues.reduce(reduceF) // return
       } else {
         // Get the previous window's reduced value
-        var tempValue = seqOfValues(0).head
+        var tempValue = arrayOfValues(0).head
         // If old values exists, then inverse reduce then from previous value
         if (!oldValues.isEmpty) {
           tempValue = invReduceF(tempValue, oldValues.reduce(reduceF))
@@ -166,7 +166,8 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
       }
     }
 
-    val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K,Seq[Seq[V]])]].mapValues(mergeValues)
+    val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, Array[Iterable[V]])]]
+      .mapValues(mergeValues)
 
     if (filterFunc.isDefined) {
       Some(mergedValuesRDD.filter(filterFunc.get))