You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:21 UTC

[37/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
new file mode 100644
index 0000000..c5d51be
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import org.apache.spark._
+import org.apache.spark.scheduler.JobListener
+
+/**
+ * A JobListener for an approximate single-result action, such as count() or non-parallel reduce().
+ * This listener waits up to timeout milliseconds and will return a partial answer even if the
+ * complete answer is not available by then.
+ *
+ * This class assumes that the action is performed on an entire RDD[T] via a function that computes
+ * a result of type U for each partition, and that the action returns a partial or complete result
+ * of type R. Note that the type R must *include* any error bars on it (e.g. see BoundedInt).
+ */
+private[spark] class ApproximateActionListener[T, U, R](
+    rdd: RDD[T],
+    func: (TaskContext, Iterator[T]) => U,
+    evaluator: ApproximateEvaluator[U, R],
+    timeout: Long)
+  extends JobListener {
+
+  val startTime = System.currentTimeMillis()
+  val totalTasks = rdd.partitions.size
+  var finishedTasks = 0
+  var failure: Option[Exception] = None             // Set if the job has failed (permanently)
+  var resultObject: Option[PartialResult[R]] = None // Set if we've already returned a PartialResult
+
+  override def taskSucceeded(index: Int, result: Any) {
+    synchronized {
+      evaluator.merge(index, result.asInstanceOf[U])
+      finishedTasks += 1
+      if (finishedTasks == totalTasks) {
+        // If we had already returned a PartialResult, set its final value
+        resultObject.foreach(r => r.setFinalValue(evaluator.currentResult()))
+        // Notify any waiting thread that may have called awaitResult
+        this.notifyAll()
+      }
+    }
+  }
+
+  override def jobFailed(exception: Exception) {
+    synchronized {
+      failure = Some(exception)
+      this.notifyAll()
+    }
+  }
+
+  /**
+   * Waits for up to timeout milliseconds since the listener was created and then returns a
+   * PartialResult with the result so far. This may be complete if the whole job is done.
+   */
+  def awaitResult(): PartialResult[R] = synchronized {
+    val finishTime = startTime + timeout
+    while (true) {
+      val time = System.currentTimeMillis()
+      if (failure != None) {
+        throw failure.get
+      } else if (finishedTasks == totalTasks) {
+        return new PartialResult(evaluator.currentResult(), true)
+      } else if (time >= finishTime) {
+        resultObject = Some(new PartialResult(evaluator.currentResult(), false))
+        return resultObject.get
+      } else {
+        this.wait(finishTime - time)
+      }
+    }
+    // Should never be reached, but required to keep the compiler happy
+    return null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
new file mode 100644
index 0000000..9c2859c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateEvaluator.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+/**
+ * An object that computes a function incrementally by merging in results of type U from multiple
+ * tasks. Allows partial evaluation at any point by calling currentResult().
+ */
+private[spark] trait ApproximateEvaluator[U, R] {
+  def merge(outputId: Int, taskResult: U): Unit
+  def currentResult(): R
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
new file mode 100644
index 0000000..5f44508
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/BoundedDouble.scala
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+/**
+ * A Double with error bars on it.
+ */
+class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
+  override def toString(): String = "[%.3f, %.3f]".format(low, high)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
new file mode 100644
index 0000000..3155dfe
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+/**
+ * An ApproximateEvaluator for counts.
+ *
+ * TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
+ * be best to make this a special case of GroupedCountEvaluator with one group.
+ */
+private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[Long, BoundedDouble] {
+
+  var outputsMerged = 0
+  var sum: Long = 0
+
+  override def merge(outputId: Int, taskResult: Long) {
+    outputsMerged += 1
+    sum += taskResult
+  }
+
+  override def currentResult(): BoundedDouble = {
+    if (outputsMerged == totalOutputs) {
+      new BoundedDouble(sum, 1.0, sum, sum)
+    } else if (outputsMerged == 0) {
+      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+    } else {
+      val p = outputsMerged.toDouble / totalOutputs
+      val mean = (sum + 1 - p) / p
+      val variance = (sum + 1) * (1 - p) / (p * p)
+      val stdev = math.sqrt(variance)
+      val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+      val low = mean - confFactor * stdev
+      val high = mean + confFactor * stdev
+      new BoundedDouble(mean, confidence, low, high)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
new file mode 100644
index 0000000..e519e3a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.Map
+import scala.collection.mutable.HashMap
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import cern.jet.stat.Probability
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+/**
+ * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
+
+  var outputsMerged = 0
+  var sums = new OLMap[T]   // Sum of counts for each key
+
+  override def merge(outputId: Int, taskResult: OLMap[T]) {
+    outputsMerged += 1
+    val iter = taskResult.object2LongEntrySet.fastIterator()
+    while (iter.hasNext) {
+      val entry = iter.next()
+      sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
+    }
+  }
+
+  override def currentResult(): Map[T, BoundedDouble] = {
+    if (outputsMerged == totalOutputs) {
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.object2LongEntrySet.fastIterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val sum = entry.getLongValue()
+        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+      }
+      result
+    } else if (outputsMerged == 0) {
+      new HashMap[T, BoundedDouble]
+    } else {
+      val p = outputsMerged.toDouble / totalOutputs
+      val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.object2LongEntrySet.fastIterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val sum = entry.getLongValue
+        val mean = (sum + 1 - p) / p
+        val variance = (sum + 1) * (1 - p) / (p * p)
+        val stdev = math.sqrt(variance)
+        val low = mean - confFactor * stdev
+        val high = mean + confFactor * stdev
+        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+      }
+      result
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
new file mode 100644
index 0000000..cf8a568
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
+
+  var outputsMerged = 0
+  var sums = new JHashMap[T, StatCounter]   // Sum of counts for each key
+
+  override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
+    outputsMerged += 1
+    val iter = taskResult.entrySet.iterator()
+    while (iter.hasNext) {
+      val entry = iter.next()
+      val old = sums.get(entry.getKey)
+      if (old != null) {
+        old.merge(entry.getValue)
+      } else {
+        sums.put(entry.getKey, entry.getValue)
+      }
+    }
+  }
+
+  override def currentResult(): Map[T, BoundedDouble] = {
+    if (outputsMerged == totalOutputs) {
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.entrySet.iterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val mean = entry.getValue.mean
+        result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
+      }
+      result
+    } else if (outputsMerged == 0) {
+      new HashMap[T, BoundedDouble]
+    } else {
+      val p = outputsMerged.toDouble / totalOutputs
+      val studentTCacher = new StudentTCacher(confidence)
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.entrySet.iterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val counter = entry.getValue
+        val mean = counter.mean
+        val stdev = math.sqrt(counter.sampleVariance / counter.count)
+        val confFactor = studentTCacher.get(counter.count)
+        val low = mean - confFactor * stdev
+        val high = mean + confFactor * stdev
+        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
+      }
+      result
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
new file mode 100644
index 0000000..8225a5d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import java.util.{HashMap => JHashMap}
+import java.util.{Map => JMap}
+
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
+ */
+private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
+
+  var outputsMerged = 0
+  var sums = new JHashMap[T, StatCounter]   // Sum of counts for each key
+
+  override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
+    outputsMerged += 1
+    val iter = taskResult.entrySet.iterator()
+    while (iter.hasNext) {
+      val entry = iter.next()
+      val old = sums.get(entry.getKey)
+      if (old != null) {
+        old.merge(entry.getValue)
+      } else {
+        sums.put(entry.getKey, entry.getValue)
+      }
+    }
+  }
+
+  override def currentResult(): Map[T, BoundedDouble] = {
+    if (outputsMerged == totalOutputs) {
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.entrySet.iterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val sum = entry.getValue.sum
+        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
+      }
+      result
+    } else if (outputsMerged == 0) {
+      new HashMap[T, BoundedDouble]
+    } else {
+      val p = outputsMerged.toDouble / totalOutputs
+      val studentTCacher = new StudentTCacher(confidence)
+      val result = new JHashMap[T, BoundedDouble](sums.size)
+      val iter = sums.entrySet.iterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        val counter = entry.getValue
+        val meanEstimate = counter.mean
+        val meanVar = counter.sampleVariance / counter.count
+        val countEstimate = (counter.count + 1 - p) / p
+        val countVar = (counter.count + 1) * (1 - p) / (p * p)
+        val sumEstimate = meanEstimate * countEstimate
+        val sumVar = (meanEstimate * meanEstimate * countVar) +
+                     (countEstimate * countEstimate * meanVar) +
+                     (meanVar * countVar)
+        val sumStdev = math.sqrt(sumVar)
+        val confFactor = studentTCacher.get(counter.count)
+        val low = sumEstimate - confFactor * sumStdev
+        val high = sumEstimate + confFactor * sumStdev
+        result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high)
+      }
+      result
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
new file mode 100644
index 0000000..d24959c
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/MeanEvaluator.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for means.
+ */
+private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
+
+  var outputsMerged = 0
+  var counter = new StatCounter
+
+  override def merge(outputId: Int, taskResult: StatCounter) {
+    outputsMerged += 1
+    counter.merge(taskResult)
+  }
+
+  override def currentResult(): BoundedDouble = {
+    if (outputsMerged == totalOutputs) {
+      new BoundedDouble(counter.mean, 1.0, counter.mean, counter.mean)
+    } else if (outputsMerged == 0) {
+      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+    } else {
+      val mean = counter.mean
+      val stdev = math.sqrt(counter.sampleVariance / counter.count)
+      val confFactor = {
+        if (counter.count > 100) {
+          Probability.normalInverse(1 - (1 - confidence) / 2)
+        } else {
+          Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+        }
+      }
+      val low = mean - confFactor * stdev
+      val high = mean + confFactor * stdev
+      new BoundedDouble(mean, confidence, low, high)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/PartialResult.scala b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
new file mode 100644
index 0000000..5ce49b8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/PartialResult.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+class PartialResult[R](initialVal: R, isFinal: Boolean) {
+  private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
+  private var failure: Option[Exception] = None
+  private var completionHandler: Option[R => Unit] = None
+  private var failureHandler: Option[Exception => Unit] = None
+
+  def initialValue: R = initialVal
+
+  def isInitialValueFinal: Boolean = isFinal
+
+  /**
+   * Blocking method to wait for and return the final value.
+   */
+  def getFinalValue(): R = synchronized {
+    while (finalValue == None && failure == None) {
+      this.wait()
+    }
+    if (finalValue != None) {
+      return finalValue.get
+    } else {
+      throw failure.get
+    }
+  }
+
+  /** 
+   * Set a handler to be called when this PartialResult completes. Only one completion handler
+   * is supported per PartialResult.
+   */
+  def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
+    if (completionHandler != None) {
+      throw new UnsupportedOperationException("onComplete cannot be called twice")
+    }
+    completionHandler = Some(handler)
+    if (finalValue != None) {
+      // We already have a final value, so let's call the handler
+      handler(finalValue.get)
+    }
+    return this
+  }
+
+  /** 
+   * Set a handler to be called if this PartialResult's job fails. Only one failure handler
+   * is supported per PartialResult.
+   */
+  def onFail(handler: Exception => Unit) {
+    synchronized {
+      if (failureHandler != None) {
+        throw new UnsupportedOperationException("onFail cannot be called twice")
+      }
+      failureHandler = Some(handler)
+      if (failure != None) {
+        // We already have a failure, so let's call the handler
+        handler(failure.get)
+      }
+    }
+  }
+
+  /**
+   * Transform this PartialResult into a PartialResult of type T.
+   */
+  def map[T](f: R => T) : PartialResult[T] = {
+    new PartialResult[T](f(initialVal), isFinal) {
+       override def getFinalValue() : T = synchronized {
+         f(PartialResult.this.getFinalValue())
+       }
+       override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
+         PartialResult.this.onComplete(handler.compose(f)).map(f)
+       }
+      override def onFail(handler: Exception => Unit) {
+        synchronized {
+          PartialResult.this.onFail(handler)
+        }
+      }
+      override def toString : String = synchronized {
+        PartialResult.this.getFinalValueInternal() match {
+          case Some(value) => "(final: " + f(value) + ")"
+          case None => "(partial: " + initialValue + ")"
+        }
+      }
+      def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
+    }
+  }
+
+  private[spark] def setFinalValue(value: R) {
+    synchronized {
+      if (finalValue != None) {
+        throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
+      }
+      finalValue = Some(value)
+      // Call the completion handler if it was set
+      completionHandler.foreach(h => h(value))
+      // Notify any threads that may be calling getFinalValue()
+      this.notifyAll()
+    }
+  }
+
+  private def getFinalValueInternal() = finalValue
+
+  private[spark] def setFailure(exception: Exception) {
+    synchronized {
+      if (failure != None) {
+        throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
+      }
+      failure = Some(exception)
+      // Call the failure handler if it was set
+      failureHandler.foreach(h => h(exception))
+      // Notify any threads that may be calling getFinalValue()
+      this.notifyAll()
+    }
+  }
+
+  override def toString: String = synchronized {
+    finalValue match {
+      case Some(value) => "(final: " + value + ")"
+      case None => "(partial: " + initialValue + ")"
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
new file mode 100644
index 0000000..92915ee
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/StudentTCacher.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+/**
+ * A utility class for caching Student's T distribution values for a given confidence level
+ * and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
+ * confidence intervals for many keys.
+ */
+private[spark] class StudentTCacher(confidence: Double) {
+  val NORMAL_APPROX_SAMPLE_SIZE = 100  // For samples bigger than this, use Gaussian approximation
+  val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
+  val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
+
+  def get(sampleSize: Long): Double = {
+    if (sampleSize >= NORMAL_APPROX_SAMPLE_SIZE) {
+      normalApprox
+    } else {
+      val size = sampleSize.toInt
+      if (cache(size) < 0) {
+        cache(size) = Probability.studentTInverse(1 - confidence, size - 1)
+      }
+      cache(size)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
new file mode 100644
index 0000000..a74f800
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/partial/SumEvaluator.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.partial
+
+import cern.jet.stat.Probability
+
+import org.apache.spark.util.StatCounter
+
+/**
+ * An ApproximateEvaluator for sums. It estimates the mean and the cont and multiplies them
+ * together, then uses the formula for the variance of two independent random variables to get
+ * a variance for the result and compute a confidence interval.
+ */
+private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
+  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
+
+  var outputsMerged = 0
+  var counter = new StatCounter
+
+  override def merge(outputId: Int, taskResult: StatCounter) {
+    outputsMerged += 1
+    counter.merge(taskResult)
+  }
+
+  override def currentResult(): BoundedDouble = {
+    if (outputsMerged == totalOutputs) {
+      new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum)
+    } else if (outputsMerged == 0) {
+      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
+    } else {
+      val p = outputsMerged.toDouble / totalOutputs
+      val meanEstimate = counter.mean
+      val meanVar = counter.sampleVariance / counter.count
+      val countEstimate = (counter.count + 1 - p) / p
+      val countVar = (counter.count + 1) * (1 - p) / (p * p)
+      val sumEstimate = meanEstimate * countEstimate
+      val sumVar = (meanEstimate * meanEstimate * countVar) +
+                   (countEstimate * countEstimate * meanVar) +
+                   (meanVar * countVar)
+      val sumStdev = math.sqrt(sumVar)
+      val confFactor = {
+        if (counter.count > 100) {
+          Probability.normalInverse(1 - (1 - confidence) / 2)
+        } else {
+          Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
+        }
+      }
+      val low = sumEstimate - confFactor * sumStdev
+      val high = sumEstimate + confFactor * sumStdev
+      new BoundedDouble(sumEstimate, confidence, low, high)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
new file mode 100644
index 0000000..4bb01ef
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.storage.BlockManager
+
+private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
+  val index = idx
+}
+
+private[spark]
+class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
+  extends RDD[T](sc, Nil) {
+
+  @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
+
+  override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
+    new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
+  }).toArray
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val blockManager = SparkEnv.get.blockManager
+    val blockId = split.asInstanceOf[BlockRDDPartition].blockId
+    blockManager.get(blockId) match {
+      case Some(block) => block.asInstanceOf[Iterator[T]]
+      case None =>
+        throw new Exception("Could not compute split, block " + blockId + " not found")
+    }
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    locations_(split.asInstanceOf[BlockRDDPartition].blockId)
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
new file mode 100644
index 0000000..9b0c882
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CartesianRDD.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+import org.apache.spark._
+
+
+private[spark]
+class CartesianPartition(
+    idx: Int,
+    @transient rdd1: RDD[_],
+    @transient rdd2: RDD[_],
+    s1Index: Int,
+    s2Index: Int
+  ) extends Partition {
+  var s1 = rdd1.partitions(s1Index)
+  var s2 = rdd2.partitions(s2Index)
+  override val index: Int = idx
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent split at the time of task serialization
+    s1 = rdd1.partitions(s1Index)
+    s2 = rdd2.partitions(s2Index)
+    oos.defaultWriteObject()
+  }
+}
+
+private[spark]
+class CartesianRDD[T: ClassManifest, U:ClassManifest](
+    sc: SparkContext,
+    var rdd1 : RDD[T],
+    var rdd2 : RDD[U])
+  extends RDD[Pair[T, U]](sc, Nil)
+  with Serializable {
+
+  val numPartitionsInRdd2 = rdd2.partitions.size
+
+  override def getPartitions: Array[Partition] = {
+    // create the cross product split
+    val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
+    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
+      val idx = s1.index * numPartitionsInRdd2 + s2.index
+      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
+    }
+    array
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val currSplit = split.asInstanceOf[CartesianPartition]
+    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
+  }
+
+  override def compute(split: Partition, context: TaskContext) = {
+    val currSplit = split.asInstanceOf[CartesianPartition]
+    for (x <- rdd1.iterator(currSplit.s1, context);
+      y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = List(
+    new NarrowDependency(rdd1) {
+      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
+    },
+    new NarrowDependency(rdd2) {
+      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
+    }
+  )
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdd1 = null
+    rdd2 = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
new file mode 100644
index 0000000..3311757
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark._
+import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.io.{NullWritable, BytesWritable}
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.fs.Path
+import java.io.{File, IOException, EOFException}
+import java.text.NumberFormat
+
+private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
+
+/**
+ * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
+ */
+private[spark]
+class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
+  extends RDD[T](sc, Nil) {
+
+  @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
+
+  override def getPartitions: Array[Partition] = {
+    val cpath = new Path(checkpointPath)
+    val numPartitions =
+    // listStatus can throw exception if path does not exist.
+    if (fs.exists(cpath)) {
+      val dirContents = fs.listStatus(cpath)
+      val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
+      val numPart =  partitionFiles.size
+      if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
+          ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
+        throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
+      }
+      numPart
+    } else 0
+
+    Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
+  }
+
+  checkpointData = Some(new RDDCheckpointData[T](this))
+  checkpointData.get.cpFile = Some(checkpointPath)
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
+    val locations = fs.getFileBlockLocations(status, 0, status.getLen)
+    locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
+  }
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
+    CheckpointRDD.readFromFile(file, context)
+  }
+
+  override def checkpoint() {
+    // Do nothing. CheckpointRDD should not be checkpointed.
+  }
+}
+
+private[spark] object CheckpointRDD extends Logging {
+
+  def splitIdToFile(splitId: Int): String = {
+    "part-%05d".format(splitId)
+  }
+
+  def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
+    val env = SparkEnv.get
+    val outputDir = new Path(path)
+    val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
+
+    val finalOutputName = splitIdToFile(ctx.splitId)
+    val finalOutputPath = new Path(outputDir, finalOutputName)
+    val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
+
+    if (fs.exists(tempOutputPath)) {
+      throw new IOException("Checkpoint failed: temporary path " +
+        tempOutputPath + " already exists")
+    }
+    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+
+    val fileOutputStream = if (blockSize < 0) {
+      fs.create(tempOutputPath, false, bufferSize)
+    } else {
+      // This is mainly for testing purpose
+      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
+    }
+    val serializer = env.serializer.newInstance()
+    val serializeStream = serializer.serializeStream(fileOutputStream)
+    serializeStream.writeAll(iterator)
+    serializeStream.close()
+
+    if (!fs.rename(tempOutputPath, finalOutputPath)) {
+      if (!fs.exists(finalOutputPath)) {
+        logInfo("Deleting tempOutputPath " + tempOutputPath)
+        fs.delete(tempOutputPath, false)
+        throw new IOException("Checkpoint failed: failed to save output of task: "
+          + ctx.attemptId + " and final output path does not exist")
+      } else {
+        // Some other copy of this task must've finished before us and renamed it
+        logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
+        fs.delete(tempOutputPath, false)
+      }
+    }
+  }
+
+  def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
+    val env = SparkEnv.get
+    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+    val fileInputStream = fs.open(path, bufferSize)
+    val serializer = env.serializer.newInstance()
+    val deserializeStream = serializer.deserializeStream(fileInputStream)
+
+    // Register an on-task-completion callback to close the input stream.
+    context.addOnCompleteCallback(() => deserializeStream.close())
+
+    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
+  }
+
+  // Test whether CheckpointRDD generate expected number of partitions despite
+  // each split file having multiple blocks. This needs to be run on a
+  // cluster (mesos or standalone) using HDFS.
+  def main(args: Array[String]) {
+    import org.apache.spark._
+
+    val Array(cluster, hdfsPath) = args
+    val env = SparkEnv.get
+    val sc = new SparkContext(cluster, "CheckpointRDD Test")
+    val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
+    val path = new Path(hdfsPath, "temp")
+    val fs = path.getFileSystem(env.hadoop.newConfiguration())
+    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
+    val cpRDD = new CheckpointRDD[Int](sc, path.toString)
+    assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
+    assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
+    fs.delete(path, true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
new file mode 100644
index 0000000..dcc35e8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.{ObjectOutputStream, IOException}
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.JavaConversions
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
+
+
+private[spark] sealed trait CoGroupSplitDep extends Serializable
+
+private[spark] case class NarrowCoGroupSplitDep(
+    rdd: RDD[_],
+    splitIndex: Int,
+    var split: Partition
+  ) extends CoGroupSplitDep {
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent split at the time of task serialization
+    split = rdd.partitions(splitIndex)
+    oos.defaultWriteObject()
+  }
+}
+
+private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
+
+private[spark]
+class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
+  extends Partition with Serializable {
+  override val index: Int = idx
+  override def hashCode(): Int = idx
+}
+
+
+/**
+ * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
+ * tuple with the list of values for that key.
+ *
+ * @param rdds parent RDDs.
+ * @param part partitioner used to partition the shuffle output.
+ */
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
+  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
+
+  private var serializerClass: String = null
+
+  def setSerializer(cls: String): CoGroupedRDD[K] = {
+    serializerClass = cls
+    this
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
+      if (rdd.partitioner == Some(part)) {
+        logDebug("Adding one-to-one dependency with " + rdd)
+        new OneToOneDependency(rdd)
+      } else {
+        logDebug("Adding shuffle dependency with " + rdd)
+        new ShuffleDependency[Any, Any](rdd, part, serializerClass)
+      }
+    }
+  }
+
+  override def getPartitions: Array[Partition] = {
+    val array = new Array[Partition](part.numPartitions)
+    for (i <- 0 until array.size) {
+      // Each CoGroupPartition will have a dependency per contributing RDD
+      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
+        // Assume each RDD contributed a single dependency, and get it
+        dependencies(j) match {
+          case s: ShuffleDependency[_, _] =>
+            new ShuffleCoGroupSplitDep(s.shuffleId)
+          case _ =>
+            new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
+        }
+      }.toArray)
+    }
+    array
+  }
+
+  override val partitioner = Some(part)
+
+  override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
+    val split = s.asInstanceOf[CoGroupPartition]
+    val numRdds = split.deps.size
+    // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
+    val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
+
+    def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
+      val seq = map.get(k)
+      if (seq != null) {
+        seq
+      } else {
+        val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
+        map.put(k, seq)
+        seq
+      }
+    }
+
+    val ser = SparkEnv.get.serializerManager.get(serializerClass)
+    for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
+      case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
+        // Read them from the parent
+        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
+          getSeq(kv._1)(depNum) += kv._2
+        }
+      }
+      case ShuffleCoGroupSplitDep(shuffleId) => {
+        // Read map outputs of shuffle
+        val fetcher = SparkEnv.get.shuffleFetcher
+        fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
+          kv => getSeq(kv._1)(depNum) += kv._2
+        }
+      }
+    }
+    JavaConversions.mapAsScalaMap(map).iterator
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    rdds = null
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
new file mode 100644
index 0000000..c5de636
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark._
+import java.io.{ObjectOutputStream, IOException}
+import scala.collection.mutable
+import scala.Some
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ * Class that captures a coalesced RDD by essentially keeping track of parent partitions
+ * @param index of this coalesced partition
+ * @param rdd which it belongs to
+ * @param parentsIndices list of indices in the parent that have been coalesced into this partition
+ * @param preferredLocation the preferred location for this partition
+ */
+case class CoalescedRDDPartition(
+                                  index: Int,
+                                  @transient rdd: RDD[_],
+                                  parentsIndices: Array[Int],
+                                  @transient preferredLocation: String = ""
+                                  ) extends Partition {
+  var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
+
+  @throws(classOf[IOException])
+  private def writeObject(oos: ObjectOutputStream) {
+    // Update the reference to parent partition at the time of task serialization
+    parents = parentsIndices.map(rdd.partitions(_))
+    oos.defaultWriteObject()
+  }
+
+  /**
+   * Computes how many of the parents partitions have getPreferredLocation
+   * as one of their preferredLocations
+   * @return locality of this coalesced partition between 0 and 1
+   */
+  def localFraction: Double = {
+    val loc = parents.count(p =>
+      rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
+
+    if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+  }
+}
+
+/**
+ * Represents a coalesced RDD that has fewer partitions than its parent RDD
+ * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
+ * so that each new partition has roughly the same number of parent partitions and that
+ * the preferred location of each new partition overlaps with as many preferred locations of its
+ * parent partitions
+ * @param prev RDD to be coalesced
+ * @param maxPartitions number of desired partitions in the coalesced RDD
+ * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
+ */
+class CoalescedRDD[T: ClassManifest](
+                                      @transient var prev: RDD[T],
+                                      maxPartitions: Int,
+                                      balanceSlack: Double = 0.10)
+  extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependencies
+
+  override def getPartitions: Array[Partition] = {
+    val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
+
+    pc.run().zipWithIndex.map {
+      case (pg, i) =>
+        val ids = pg.arr.map(_.index).toArray
+        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+    }
+  }
+
+  override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
+    partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
+      firstParent[T].iterator(parentPartition, context)
+    }
+  }
+
+  override def getDependencies: Seq[Dependency[_]] = {
+    Seq(new NarrowDependency(prev) {
+      def getParents(id: Int): Seq[Int] =
+        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
+    })
+  }
+
+  override def clearDependencies() {
+    super.clearDependencies()
+    prev = null
+  }
+
+  /**
+   * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
+   * then the preferred machine will be one which most parent splits prefer too.
+   * @param partition
+   * @return the machine most preferred by split
+   */
+  override def getPreferredLocations(partition: Partition): Seq[String] = {
+    List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
+  }
+}
+
+/**
+ * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
+ * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
+ * parent had more than maxPartitions, or fewer if the parent had fewer.
+ *
+ * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
+ * or to avoid having a large number of small tasks when processing a directory with many files.
+ *
+ * If there is no locality information (no preferredLocations) in the parent, then the coalescing
+ * is very simple: chunk parents that are close in the Array in chunks.
+ * If there is locality information, it proceeds to pack them with the following four goals:
+ *
+ * (1) Balance the groups so they roughly have the same number of parent partitions
+ * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
+ * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
+ * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
+ *
+ * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
+ * We assume the final number of desired partitions is small, e.g. less than 1000.
+ *
+ * The algorithm tries to assign unique preferred machines to each partition. If the number of
+ * desired partitions is greater than the number of preferred machines (can happen), it needs to
+ * start picking duplicate preferred machines. This is determined using coupon collector estimation
+ * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
+ * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
+ * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
+ * according to locality. (contact alig for questions)
+ *
+ */
+
+private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
+
+  def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
+  def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
+    if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
+
+  val rnd = new scala.util.Random(7919) // keep this class deterministic
+
+  // each element of groupArr represents one coalesced partition
+  val groupArr = ArrayBuffer[PartitionGroup]()
+
+  // hash used to check whether some machine is already in groupArr
+  val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
+
+  // hash used for the first maxPartitions (to avoid duplicates)
+  val initialHash = mutable.Set[Partition]()
+
+  // determines the tradeoff between load-balancing the partitions sizes and their locality
+  // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
+  val slack = (balanceSlack * prev.partitions.size).toInt
+
+  var noLocality = true  // if true if no preferredLocations exists for parent RDD
+
+  // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
+  def currPrefLocs(part: Partition): Seq[String] = {
+    prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
+  }
+
+  // this class just keeps iterating and rotating infinitely over the partitions of the RDD
+  // next() returns the next preferred machine that a partition is replicated on
+  // the rotator first goes through the first replica copy of each partition, then second, third
+  // the iterators return type is a tuple: (replicaString, partition)
+  class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
+
+    var it: Iterator[(String, Partition)] = resetIterator()
+
+    override val isEmpty = !it.hasNext
+
+    // initializes/resets to start iterating from the beginning
+    def resetIterator() = {
+      val iterators = (0 to 2).map( x =>
+        prev.partitions.iterator.flatMap(p => {
+          if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
+        } )
+      )
+      iterators.reduceLeft((x, y) => x ++ y)
+    }
+
+    // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
+    def hasNext(): Boolean = { !isEmpty }
+
+    // return the next preferredLocation of some partition of the RDD
+    def next(): (String, Partition) = {
+      if (it.hasNext)
+        it.next()
+      else {
+        it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
+        it.next()
+      }
+    }
+  }
+
+  /**
+   * Sorts and gets the least element of the list associated with key in groupHash
+   * The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
+   * @param key string representing a partitioned group on preferred machine key
+   * @return Option of PartitionGroup that has least elements for key
+   */
+  def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+    groupHash.get(key).map(_.sortWith(compare).head)
+  }
+
+  def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
+    if (!initialHash.contains(part)) {
+      pgroup.arr += part           // already assign this element
+      initialHash += part // needed to avoid assigning partitions to multiple buckets
+      true
+    } else { false }
+  }
+
+  /**
+   * Initializes targetLen partition groups and assigns a preferredLocation
+   * This uses coupon collector to estimate how many preferredLocations it must rotate through
+   * until it has seen most of the preferred locations (2 * n log(n))
+   * @param targetLen
+   */
+  def setupGroups(targetLen: Int) {
+    val rotIt = new LocationIterator(prev)
+
+    // deal with empty case, just create targetLen partition groups with no preferred location
+    if (!rotIt.hasNext()) {
+      (1 to targetLen).foreach(x => groupArr += PartitionGroup())
+      return
+    }
+
+    noLocality = false
+
+    // number of iterations needed to be certain that we've seen most preferred locations
+    val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
+    var numCreated = 0
+    var tries = 0
+
+    // rotate through until either targetLen unique/distinct preferred locations have been created
+    // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
+    // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
+    while (numCreated < targetLen && tries < expectedCoupons2) {
+      tries += 1
+      val (nxt_replica, nxt_part) = rotIt.next()
+      if (!groupHash.contains(nxt_replica)) {
+        val pgroup = PartitionGroup(nxt_replica)
+        groupArr += pgroup
+        addPartToPGroup(nxt_part, pgroup)
+        groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
+        numCreated += 1
+      }
+    }
+
+    while (numCreated < targetLen) {  // if we don't have enough partition groups, create duplicates
+      var (nxt_replica, nxt_part) = rotIt.next()
+      val pgroup = PartitionGroup(nxt_replica)
+      groupArr += pgroup
+      groupHash.get(nxt_replica).get += pgroup
+      var tries = 0
+      while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
+        nxt_part = rotIt.next()._2
+        tries += 1
+      }
+      numCreated += 1
+    }
+
+  }
+
+  /**
+   * Takes a parent RDD partition and decides which of the partition groups to put it in
+   * Takes locality into account, but also uses power of 2 choices to load balance
+   * It strikes a balance between the two use the balanceSlack variable
+   * @param p partition (ball to be thrown)
+   * @return partition group (bin to be put in)
+   */
+  def pickBin(p: Partition): PartitionGroup = {
+    val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
+    val prefPart = if (pref == Nil) None else pref.head
+
+    val r1 = rnd.nextInt(groupArr.size)
+    val r2 = rnd.nextInt(groupArr.size)
+    val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
+    if (prefPart== None) // if no preferred locations, just use basic power of two
+      return minPowerOfTwo
+
+    val prefPartActual = prefPart.get
+
+    if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
+      return minPowerOfTwo  // prefer balance over locality
+    else {
+      return prefPartActual // prefer locality over balance
+    }
+  }
+
+  def throwBalls() {
+    if (noLocality) {  // no preferredLocations in parent RDD, no randomization needed
+      if (maxPartitions > groupArr.size) { // just return prev.partitions
+        for ((p,i) <- prev.partitions.zipWithIndex) {
+          groupArr(i).arr += p
+        }
+      } else { // no locality available, then simply split partitions based on positions in array
+        for(i <- 0 until maxPartitions) {
+          val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
+          val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
+          (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
+        }
+      }
+    } else {
+      for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
+        pickBin(p).arr += p
+      }
+    }
+  }
+
+  def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
+
+  /**
+   * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
+   * load balanced and grouped by locality
+   * @return array of partition groups
+   */
+  def run(): Array[PartitionGroup] = {
+    setupGroups(math.min(prev.partitions.length, maxPartitions))   // setup the groups (bins)
+    throwBalls()             // assign partitions (balls) to each group (bins)
+    getPartitions
+  }
+}
+
+private[spark] case class PartitionGroup(prefLoc: String = "") {
+  var arr = mutable.ArrayBuffer[Partition]()
+
+  def size = arr.size
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
new file mode 100644
index 0000000..24ce4ab
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+
+
+/**
+ * An RDD that is empty, i.e. has no element in it.
+ */
+class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = Array.empty
+
+  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
+    throw new UnsupportedOperationException("empty RDD")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
new file mode 100644
index 0000000..4df8ceb
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{OneToOneDependency, RDD, Partition, TaskContext}
+
+private[spark] class FilteredRDD[T: ClassManifest](
+    prev: RDD[T],
+    f: T => Boolean)
+  extends RDD[T](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override val partitioner = prev.partitioner    // Since filter cannot change a partition's keys
+
+  override def compute(split: Partition, context: TaskContext) =
+    firstParent[T].iterator(split, context).filter(f)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
new file mode 100644
index 0000000..2bf7653
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+
+private[spark]
+class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
+    prev: RDD[T],
+    f: T => TraversableOnce[U])
+  extends RDD[U](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override def compute(split: Partition, context: TaskContext) =
+    firstParent[T].iterator(split, context).flatMap(f)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000..e544720
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+  extends RDD[(K, U)](prev) {
+
+  override def getPartitions = firstParent[Product2[K, V]].partitions
+
+  override val partitioner = firstParent[Product2[K, V]].partitioner
+
+  override def compute(split: Partition, context: TaskContext) = {
+    firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
+      f(v).map(x => (k, x))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
new file mode 100644
index 0000000..2ce9419
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
+  extends RDD[Array[T]](prev) {
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override def compute(split: Partition, context: TaskContext) =
+    Array(firstParent[T].iterator(split, context).toArray).iterator
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
new file mode 100644
index 0000000..08e6154
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.io.EOFException
+import java.util.NoSuchElementException
+
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.FileInputFormat
+import org.apache.hadoop.mapred.InputFormat
+import org.apache.hadoop.mapred.InputSplit
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.TextInputFormat
+import org.apache.hadoop.mapred.RecordReader
+import org.apache.hadoop.mapred.Reporter
+import org.apache.hadoop.util.ReflectionUtils
+
+import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.util.NextIterator
+import org.apache.hadoop.conf.{Configuration, Configurable}
+
+
+/**
+ * A Spark split class that wraps around a Hadoop InputSplit.
+ */
+private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
+  extends Partition {
+  
+  val inputSplit = new SerializableWritable[InputSplit](s)
+
+  override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
+
+  override val index: Int = idx
+}
+
+/**
+ * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
+ * system, or S3, tables in HBase, etc).
+ */
+class HadoopRDD[K, V](
+    sc: SparkContext,
+    @transient conf: JobConf,
+    inputFormatClass: Class[_ <: InputFormat[K, V]],
+    keyClass: Class[K],
+    valueClass: Class[V],
+    minSplits: Int)
+  extends RDD[(K, V)](sc, Nil) with Logging {
+
+  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
+  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
+
+  override def getPartitions: Array[Partition] = {
+    val env = SparkEnv.get
+    env.hadoop.addCredentials(conf)
+    val inputFormat = createInputFormat(conf)
+    if (inputFormat.isInstanceOf[Configurable]) {
+      inputFormat.asInstanceOf[Configurable].setConf(conf)
+    }
+    val inputSplits = inputFormat.getSplits(conf, minSplits)
+    val array = new Array[Partition](inputSplits.size)
+    for (i <- 0 until inputSplits.size) {
+      array(i) = new HadoopPartition(id, i, inputSplits(i))
+    }
+    array
+  }
+
+  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
+    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
+      .asInstanceOf[InputFormat[K, V]]
+  }
+
+  override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
+    val split = theSplit.asInstanceOf[HadoopPartition]
+    logInfo("Input split: " + split.inputSplit)
+    var reader: RecordReader[K, V] = null
+
+    val conf = confBroadcast.value.value
+    val fmt = createInputFormat(conf)
+    if (fmt.isInstanceOf[Configurable]) {
+      fmt.asInstanceOf[Configurable].setConf(conf)
+    }
+    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
+
+    // Register an on-task-completion callback to close the input stream.
+    context.addOnCompleteCallback{ () => closeIfNeeded() }
+
+    val key: K = reader.createKey()
+    val value: V = reader.createValue()
+
+    override def getNext() = {
+      try {
+        finished = !reader.next(key, value)
+      } catch {
+        case eof: EOFException =>
+          finished = true
+      }
+      (key, value)
+    }
+
+    override def close() {
+      try {
+        reader.close()
+      } catch {
+        case e: Exception => logWarning("Exception in RecordReader.close()", e)
+      }
+    }
+  }
+
+  override def getPreferredLocations(split: Partition): Seq[String] = {
+    // TODO: Filtering out "localhost" in case of file:// URLs
+    val hadoopSplit = split.asInstanceOf[HadoopPartition]
+    hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
+  }
+
+  override def checkpoint() {
+    // Do nothing. Hadoop RDD should not be checkpointed.
+  }
+
+  def getConf: Configuration = confBroadcast.value.value
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
new file mode 100644
index 0000000..3db460b
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.sql.{Connection, ResultSet}
+
+import org.apache.spark.{Logging, Partition, RDD, SparkContext, TaskContext}
+import org.apache.spark.util.NextIterator
+
+private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
+  override def index = idx
+}
+
+/**
+ * An RDD that executes an SQL query on a JDBC connection and reads results.
+ * For usage example, see test case JdbcRDDSuite.
+ *
+ * @param getConnection a function that returns an open Connection.
+ *   The RDD takes care of closing the connection.
+ * @param sql the text of the query.
+ *   The query must contain two ? placeholders for parameters used to partition the results.
+ *   E.g. "select title, author from books where ? <= id and id <= ?"
+ * @param lowerBound the minimum value of the first placeholder
+ * @param upperBound the maximum value of the second placeholder
+ *   The lower and upper bounds are inclusive.
+ * @param numPartitions the number of partitions.
+ *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
+ *   the query would be executed twice, once with (1, 10) and once with (11, 20)
+ * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
+ *   This should only call getInt, getString, etc; the RDD takes care of calling next.
+ *   The default maps a ResultSet to an array of Object.
+ */
+class JdbcRDD[T: ClassManifest](
+    sc: SparkContext,
+    getConnection: () => Connection,
+    sql: String,
+    lowerBound: Long,
+    upperBound: Long,
+    numPartitions: Int,
+    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
+  extends RDD[T](sc, Nil) with Logging {
+
+  override def getPartitions: Array[Partition] = {
+    // bounds are inclusive, hence the + 1 here and - 1 on end
+    val length = 1 + upperBound - lowerBound
+    (0 until numPartitions).map(i => {
+      val start = lowerBound + ((i * length) / numPartitions).toLong
+      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
+      new JdbcPartition(i, start, end)
+    }).toArray
+  }
+
+  override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
+    context.addOnCompleteCallback{ () => closeIfNeeded() }
+    val part = thePart.asInstanceOf[JdbcPartition]
+    val conn = getConnection()
+    val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
+
+    // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
+    // rather than pulling entire resultset into memory.
+    // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
+    if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
+      stmt.setFetchSize(Integer.MIN_VALUE)
+      logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
+    }
+
+    stmt.setLong(1, part.lower)
+    stmt.setLong(2, part.upper)
+    val rs = stmt.executeQuery()
+
+    override def getNext: T = {
+      if (rs.next()) {
+        mapRow(rs)
+      } else {
+        finished = true
+        null.asInstanceOf[T]
+      }
+    }
+
+    override def close() {
+      try {
+        if (null != rs && ! rs.isClosed()) rs.close()
+      } catch {
+        case e: Exception => logWarning("Exception closing resultset", e)
+      }
+      try {
+        if (null != stmt && ! stmt.isClosed()) stmt.close()
+      } catch {
+        case e: Exception => logWarning("Exception closing statement", e)
+      }
+      try {
+        if (null != conn && ! stmt.isClosed()) conn.close()
+        logInfo("closed connection")
+      } catch {
+        case e: Exception => logWarning("Exception closing connection", e)
+      }
+    }
+  }
+}
+
+object JdbcRDD {
+  def resultSetToObjectArray(rs: ResultSet) = {
+    Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
new file mode 100644
index 0000000..13009d3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.{RDD, Partition, TaskContext}
+
+
+private[spark]
+class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
+    prev: RDD[T],
+    f: Iterator[T] => Iterator[U],
+    preservesPartitioning: Boolean = false)
+  extends RDD[U](prev) {
+
+  override val partitioner =
+    if (preservesPartitioning) firstParent[T].partitioner else None
+
+  override def getPartitions: Array[Partition] = firstParent[T].partitions
+
+  override def compute(split: Partition, context: TaskContext) =
+    f(firstParent[T].iterator(split, context))
+}