You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:04 UTC

[20/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala b/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
deleted file mode 100644
index 5eae144..0000000
--- a/core/src/main/scala/spark/partial/ApproximateEvaluator.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-/**
- * An object that computes a function incrementally by merging in results of type U from multiple
- * tasks. Allows partial evaluation at any point by calling currentResult().
- */
-private[spark] trait ApproximateEvaluator[U, R] {
-  def merge(outputId: Int, taskResult: U): Unit
-  def currentResult(): R
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/BoundedDouble.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala
deleted file mode 100644
index 8bdbe6c..0000000
--- a/core/src/main/scala/spark/partial/BoundedDouble.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-/**
- * A Double with error bars on it.
- */
-class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
-  override def toString(): String = "[%.3f, %.3f]".format(low, high)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/CountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/CountEvaluator.scala b/core/src/main/scala/spark/partial/CountEvaluator.scala
deleted file mode 100644
index 6aa9209..0000000
--- a/core/src/main/scala/spark/partial/CountEvaluator.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-/**
- * An ApproximateEvaluator for counts.
- *
- * TODO: There's currently a lot of shared code between this and GroupedCountEvaluator. It might
- * be best to make this a special case of GroupedCountEvaluator with one group.
- */
-private[spark] class CountEvaluator(totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[Long, BoundedDouble] {
-
-  var outputsMerged = 0
-  var sum: Long = 0
-
-  override def merge(outputId: Int, taskResult: Long) {
-    outputsMerged += 1
-    sum += taskResult
-  }
-
-  override def currentResult(): BoundedDouble = {
-    if (outputsMerged == totalOutputs) {
-      new BoundedDouble(sum, 1.0, sum, sum)
-    } else if (outputsMerged == 0) {
-      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
-    } else {
-      val p = outputsMerged.toDouble / totalOutputs
-      val mean = (sum + 1 - p) / p
-      val variance = (sum + 1) * (1 - p) / (p * p)
-      val stdev = math.sqrt(variance)
-      val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
-      val low = mean - confFactor * stdev
-      val high = mean + confFactor * stdev
-      new BoundedDouble(mean, confidence, low, high)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
deleted file mode 100644
index ebe2e5a..0000000
--- a/core/src/main/scala/spark/partial/GroupedCountEvaluator.scala
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.Map
-import scala.collection.mutable.HashMap
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import cern.jet.stat.Probability
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-/**
- * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
-
-  var outputsMerged = 0
-  var sums = new OLMap[T]   // Sum of counts for each key
-
-  override def merge(outputId: Int, taskResult: OLMap[T]) {
-    outputsMerged += 1
-    val iter = taskResult.object2LongEntrySet.fastIterator()
-    while (iter.hasNext) {
-      val entry = iter.next()
-      sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
-    }
-  }
-
-  override def currentResult(): Map[T, BoundedDouble] = {
-    if (outputsMerged == totalOutputs) {
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val sum = entry.getLongValue()
-        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
-      }
-      result
-    } else if (outputsMerged == 0) {
-      new HashMap[T, BoundedDouble]
-    } else {
-      val p = outputsMerged.toDouble / totalOutputs
-      val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val sum = entry.getLongValue
-        val mean = (sum + 1 - p) / p
-        val variance = (sum + 1) * (1 - p) / (p * p)
-        val stdev = math.sqrt(variance)
-        val low = mean - confFactor * stdev
-        val high = mean + confFactor * stdev
-        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
-      }
-      result
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
deleted file mode 100644
index 2dadbbd..0000000
--- a/core/src/main/scala/spark/partial/GroupedMeanEvaluator.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.mutable.HashMap
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for means by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
-
-  var outputsMerged = 0
-  var sums = new JHashMap[T, StatCounter]   // Sum of counts for each key
-
-  override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
-    outputsMerged += 1
-    val iter = taskResult.entrySet.iterator()
-    while (iter.hasNext) {
-      val entry = iter.next()
-      val old = sums.get(entry.getKey)
-      if (old != null) {
-        old.merge(entry.getValue)
-      } else {
-        sums.put(entry.getKey, entry.getValue)
-      }
-    }
-  }
-
-  override def currentResult(): Map[T, BoundedDouble] = {
-    if (outputsMerged == totalOutputs) {
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.entrySet.iterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val mean = entry.getValue.mean
-        result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean)
-      }
-      result
-    } else if (outputsMerged == 0) {
-      new HashMap[T, BoundedDouble]
-    } else {
-      val p = outputsMerged.toDouble / totalOutputs
-      val studentTCacher = new StudentTCacher(confidence)
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.entrySet.iterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val counter = entry.getValue
-        val mean = counter.mean
-        val stdev = math.sqrt(counter.sampleVariance / counter.count)
-        val confFactor = studentTCacher.get(counter.count)
-        val low = mean - confFactor * stdev
-        val high = mean + confFactor * stdev
-        result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
-      }
-      result
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
deleted file mode 100644
index ae2b63f..0000000
--- a/core/src/main/scala/spark/partial/GroupedSumEvaluator.scala
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import java.util.{HashMap => JHashMap}
-import java.util.{Map => JMap}
-
-import scala.collection.mutable.HashMap
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for sums by key. Returns a map of key to confidence interval.
- */
-private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[JHashMap[T, StatCounter], Map[T, BoundedDouble]] {
-
-  var outputsMerged = 0
-  var sums = new JHashMap[T, StatCounter]   // Sum of counts for each key
-
-  override def merge(outputId: Int, taskResult: JHashMap[T, StatCounter]) {
-    outputsMerged += 1
-    val iter = taskResult.entrySet.iterator()
-    while (iter.hasNext) {
-      val entry = iter.next()
-      val old = sums.get(entry.getKey)
-      if (old != null) {
-        old.merge(entry.getValue)
-      } else {
-        sums.put(entry.getKey, entry.getValue)
-      }
-    }
-  }
-
-  override def currentResult(): Map[T, BoundedDouble] = {
-    if (outputsMerged == totalOutputs) {
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.entrySet.iterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val sum = entry.getValue.sum
-        result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
-      }
-      result
-    } else if (outputsMerged == 0) {
-      new HashMap[T, BoundedDouble]
-    } else {
-      val p = outputsMerged.toDouble / totalOutputs
-      val studentTCacher = new StudentTCacher(confidence)
-      val result = new JHashMap[T, BoundedDouble](sums.size)
-      val iter = sums.entrySet.iterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        val counter = entry.getValue
-        val meanEstimate = counter.mean
-        val meanVar = counter.sampleVariance / counter.count
-        val countEstimate = (counter.count + 1 - p) / p
-        val countVar = (counter.count + 1) * (1 - p) / (p * p)
-        val sumEstimate = meanEstimate * countEstimate
-        val sumVar = (meanEstimate * meanEstimate * countVar) +
-                     (countEstimate * countEstimate * meanVar) +
-                     (meanVar * countVar)
-        val sumStdev = math.sqrt(sumVar)
-        val confFactor = studentTCacher.get(counter.count)
-        val low = sumEstimate - confFactor * sumStdev
-        val high = sumEstimate + confFactor * sumStdev
-        result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high)
-      }
-      result
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/MeanEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/MeanEvaluator.scala b/core/src/main/scala/spark/partial/MeanEvaluator.scala
deleted file mode 100644
index 5ddcad7..0000000
--- a/core/src/main/scala/spark/partial/MeanEvaluator.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for means.
- */
-private[spark] class MeanEvaluator(totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
-
-  var outputsMerged = 0
-  var counter = new StatCounter
-
-  override def merge(outputId: Int, taskResult: StatCounter) {
-    outputsMerged += 1
-    counter.merge(taskResult)
-  }
-
-  override def currentResult(): BoundedDouble = {
-    if (outputsMerged == totalOutputs) {
-      new BoundedDouble(counter.mean, 1.0, counter.mean, counter.mean)
-    } else if (outputsMerged == 0) {
-      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
-    } else {
-      val mean = counter.mean
-      val stdev = math.sqrt(counter.sampleVariance / counter.count)
-      val confFactor = {
-        if (counter.count > 100) {
-          Probability.normalInverse(1 - (1 - confidence) / 2)
-        } else {
-          Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
-        }
-      }
-      val low = mean - confFactor * stdev
-      val high = mean + confFactor * stdev
-      new BoundedDouble(mean, confidence, low, high)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/PartialResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/PartialResult.scala b/core/src/main/scala/spark/partial/PartialResult.scala
deleted file mode 100644
index 922a9f9..0000000
--- a/core/src/main/scala/spark/partial/PartialResult.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-class PartialResult[R](initialVal: R, isFinal: Boolean) {
-  private var finalValue: Option[R] = if (isFinal) Some(initialVal) else None
-  private var failure: Option[Exception] = None
-  private var completionHandler: Option[R => Unit] = None
-  private var failureHandler: Option[Exception => Unit] = None
-
-  def initialValue: R = initialVal
-
-  def isInitialValueFinal: Boolean = isFinal
-
-  /**
-   * Blocking method to wait for and return the final value.
-   */
-  def getFinalValue(): R = synchronized {
-    while (finalValue == None && failure == None) {
-      this.wait()
-    }
-    if (finalValue != None) {
-      return finalValue.get
-    } else {
-      throw failure.get
-    }
-  }
-
-  /** 
-   * Set a handler to be called when this PartialResult completes. Only one completion handler
-   * is supported per PartialResult.
-   */
-  def onComplete(handler: R => Unit): PartialResult[R] = synchronized {
-    if (completionHandler != None) {
-      throw new UnsupportedOperationException("onComplete cannot be called twice")
-    }
-    completionHandler = Some(handler)
-    if (finalValue != None) {
-      // We already have a final value, so let's call the handler
-      handler(finalValue.get)
-    }
-    return this
-  }
-
-  /** 
-   * Set a handler to be called if this PartialResult's job fails. Only one failure handler
-   * is supported per PartialResult.
-   */
-  def onFail(handler: Exception => Unit) {
-    synchronized {
-      if (failureHandler != None) {
-        throw new UnsupportedOperationException("onFail cannot be called twice")
-      }
-      failureHandler = Some(handler)
-      if (failure != None) {
-        // We already have a failure, so let's call the handler
-        handler(failure.get)
-      }
-    }
-  }
-
-  /**
-   * Transform this PartialResult into a PartialResult of type T.
-   */
-  def map[T](f: R => T) : PartialResult[T] = {
-    new PartialResult[T](f(initialVal), isFinal) {
-       override def getFinalValue() : T = synchronized {
-         f(PartialResult.this.getFinalValue())
-       }
-       override def onComplete(handler: T => Unit): PartialResult[T] = synchronized {
-         PartialResult.this.onComplete(handler.compose(f)).map(f)
-       }
-      override def onFail(handler: Exception => Unit) {
-        synchronized {
-          PartialResult.this.onFail(handler)
-        }
-      }
-      override def toString : String = synchronized {
-        PartialResult.this.getFinalValueInternal() match {
-          case Some(value) => "(final: " + f(value) + ")"
-          case None => "(partial: " + initialValue + ")"
-        }
-      }
-      def getFinalValueInternal() = PartialResult.this.getFinalValueInternal().map(f)
-    }
-  }
-
-  private[spark] def setFinalValue(value: R) {
-    synchronized {
-      if (finalValue != None) {
-        throw new UnsupportedOperationException("setFinalValue called twice on a PartialResult")
-      }
-      finalValue = Some(value)
-      // Call the completion handler if it was set
-      completionHandler.foreach(h => h(value))
-      // Notify any threads that may be calling getFinalValue()
-      this.notifyAll()
-    }
-  }
-
-  private def getFinalValueInternal() = finalValue
-
-  private[spark] def setFailure(exception: Exception) {
-    synchronized {
-      if (failure != None) {
-        throw new UnsupportedOperationException("setFailure called twice on a PartialResult")
-      }
-      failure = Some(exception)
-      // Call the failure handler if it was set
-      failureHandler.foreach(h => h(exception))
-      // Notify any threads that may be calling getFinalValue()
-      this.notifyAll()
-    }
-  }
-
-  override def toString: String = synchronized {
-    finalValue match {
-      case Some(value) => "(final: " + value + ")"
-      case None => "(partial: " + initialValue + ")"
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/StudentTCacher.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/StudentTCacher.scala b/core/src/main/scala/spark/partial/StudentTCacher.scala
deleted file mode 100644
index f3bb987..0000000
--- a/core/src/main/scala/spark/partial/StudentTCacher.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-/**
- * A utility class for caching Student's T distribution values for a given confidence level
- * and various sample sizes. This is used by the MeanEvaluator to efficiently calculate
- * confidence intervals for many keys.
- */
-private[spark] class StudentTCacher(confidence: Double) {
-  val NORMAL_APPROX_SAMPLE_SIZE = 100  // For samples bigger than this, use Gaussian approximation
-  val normalApprox = Probability.normalInverse(1 - (1 - confidence) / 2)
-  val cache = Array.fill[Double](NORMAL_APPROX_SAMPLE_SIZE)(-1.0)
-
-  def get(sampleSize: Long): Double = {
-    if (sampleSize >= NORMAL_APPROX_SAMPLE_SIZE) {
-      normalApprox
-    } else {
-      val size = sampleSize.toInt
-      if (cache(size) < 0) {
-        cache(size) = Probability.studentTInverse(1 - confidence, size - 1)
-      }
-      cache(size)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/partial/SumEvaluator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/partial/SumEvaluator.scala b/core/src/main/scala/spark/partial/SumEvaluator.scala
deleted file mode 100644
index 4083abe..0000000
--- a/core/src/main/scala/spark/partial/SumEvaluator.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.partial
-
-import cern.jet.stat.Probability
-
-import spark.util.StatCounter
-
-/**
- * An ApproximateEvaluator for sums. It estimates the mean and the cont and multiplies them
- * together, then uses the formula for the variance of two independent random variables to get
- * a variance for the result and compute a confidence interval.
- */
-private[spark] class SumEvaluator(totalOutputs: Int, confidence: Double)
-  extends ApproximateEvaluator[StatCounter, BoundedDouble] {
-
-  var outputsMerged = 0
-  var counter = new StatCounter
-
-  override def merge(outputId: Int, taskResult: StatCounter) {
-    outputsMerged += 1
-    counter.merge(taskResult)
-  }
-
-  override def currentResult(): BoundedDouble = {
-    if (outputsMerged == totalOutputs) {
-      new BoundedDouble(counter.sum, 1.0, counter.sum, counter.sum)
-    } else if (outputsMerged == 0) {
-      new BoundedDouble(0, 0.0, Double.NegativeInfinity, Double.PositiveInfinity)
-    } else {
-      val p = outputsMerged.toDouble / totalOutputs
-      val meanEstimate = counter.mean
-      val meanVar = counter.sampleVariance / counter.count
-      val countEstimate = (counter.count + 1 - p) / p
-      val countVar = (counter.count + 1) * (1 - p) / (p * p)
-      val sumEstimate = meanEstimate * countEstimate
-      val sumVar = (meanEstimate * meanEstimate * countVar) +
-                   (countEstimate * countEstimate * meanVar) +
-                   (meanVar * countVar)
-      val sumStdev = math.sqrt(sumVar)
-      val confFactor = {
-        if (counter.count > 100) {
-          Probability.normalInverse(1 - (1 - confidence) / 2)
-        } else {
-          Probability.studentTInverse(1 - confidence, (counter.count - 1).toInt)
-        }
-      }
-      val low = sumEstimate - confFactor * sumStdev
-      val high = sumEstimate + confFactor * sumStdev
-      new BoundedDouble(sumEstimate, confidence, low, high)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
deleted file mode 100644
index 0380058..0000000
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-import spark.storage.BlockManager
-
-private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
-  val index = idx
-}
-
-private[spark]
-class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
-  extends RDD[T](sc, Nil) {
-
-  @transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
-
-  override def getPartitions: Array[Partition] = (0 until blockIds.size).map(i => {
-    new BlockRDDPartition(blockIds(i), i).asInstanceOf[Partition]
-  }).toArray
-
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val blockManager = SparkEnv.get.blockManager
-    val blockId = split.asInstanceOf[BlockRDDPartition].blockId
-    blockManager.get(blockId) match {
-      case Some(block) => block.asInstanceOf[Iterator[T]]
-      case None =>
-        throw new Exception("Could not compute split, block " + blockId + " not found")
-    }
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    locations_(split.asInstanceOf[BlockRDDPartition].blockId)
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CartesianRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
deleted file mode 100644
index 91b3e69..0000000
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.{ObjectOutputStream, IOException}
-import spark._
-
-
-private[spark]
-class CartesianPartition(
-    idx: Int,
-    @transient rdd1: RDD[_],
-    @transient rdd2: RDD[_],
-    s1Index: Int,
-    s2Index: Int
-  ) extends Partition {
-  var s1 = rdd1.partitions(s1Index)
-  var s2 = rdd2.partitions(s2Index)
-  override val index: Int = idx
-
-  @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
-    // Update the reference to parent split at the time of task serialization
-    s1 = rdd1.partitions(s1Index)
-    s2 = rdd2.partitions(s2Index)
-    oos.defaultWriteObject()
-  }
-}
-
-private[spark]
-class CartesianRDD[T: ClassManifest, U:ClassManifest](
-    sc: SparkContext,
-    var rdd1 : RDD[T],
-    var rdd2 : RDD[U])
-  extends RDD[Pair[T, U]](sc, Nil)
-  with Serializable {
-
-  val numPartitionsInRdd2 = rdd2.partitions.size
-
-  override def getPartitions: Array[Partition] = {
-    // create the cross product split
-    val array = new Array[Partition](rdd1.partitions.size * rdd2.partitions.size)
-    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
-      val idx = s1.index * numPartitionsInRdd2 + s2.index
-      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
-    }
-    array
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val currSplit = split.asInstanceOf[CartesianPartition]
-    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
-  }
-
-  override def compute(split: Partition, context: TaskContext) = {
-    val currSplit = split.asInstanceOf[CartesianPartition]
-    for (x <- rdd1.iterator(currSplit.s1, context);
-      y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
-  }
-
-  override def getDependencies: Seq[Dependency[_]] = List(
-    new NarrowDependency(rdd1) {
-      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
-    },
-    new NarrowDependency(rdd2) {
-      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
-    }
-  )
-
-  override def clearDependencies() {
-    super.clearDependencies()
-    rdd1 = null
-    rdd2 = null
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala
deleted file mode 100644
index 1ad5fe6..0000000
--- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark._
-import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.io.{NullWritable, BytesWritable}
-import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.fs.Path
-import java.io.{File, IOException, EOFException}
-import java.text.NumberFormat
-
-private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
-
-/**
- * This RDD represents a RDD checkpoint file (similar to HadoopRDD).
- */
-private[spark]
-class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
-  extends RDD[T](sc, Nil) {
-
-  @transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)
-
-  override def getPartitions: Array[Partition] = {
-    val cpath = new Path(checkpointPath)
-    val numPartitions =
-    // listStatus can throw exception if path does not exist.
-    if (fs.exists(cpath)) {
-      val dirContents = fs.listStatus(cpath)
-      val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted
-      val numPart =  partitionFiles.size
-      if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) ||
-          ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) {
-        throw new SparkException("Invalid checkpoint directory: " + checkpointPath)
-      }
-      numPart
-    } else 0
-
-    Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i))
-  }
-
-  checkpointData = Some(new RDDCheckpointData[T](this))
-  checkpointData.get.cpFile = Some(checkpointPath)
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    val status = fs.getFileStatus(new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index)))
-    val locations = fs.getFileBlockLocations(status, 0, status.getLen)
-    locations.headOption.toList.flatMap(_.getHosts).filter(_ != "localhost")
-  }
-
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    val file = new Path(checkpointPath, CheckpointRDD.splitIdToFile(split.index))
-    CheckpointRDD.readFromFile(file, context)
-  }
-
-  override def checkpoint() {
-    // Do nothing. CheckpointRDD should not be checkpointed.
-  }
-}
-
-private[spark] object CheckpointRDD extends Logging {
-
-  def splitIdToFile(splitId: Int): String = {
-    "part-%05d".format(splitId)
-  }
-
-  def writeToFile[T](path: String, blockSize: Int = -1)(ctx: TaskContext, iterator: Iterator[T]) {
-    val env = SparkEnv.get
-    val outputDir = new Path(path)
-    val fs = outputDir.getFileSystem(env.hadoop.newConfiguration())
-
-    val finalOutputName = splitIdToFile(ctx.splitId)
-    val finalOutputPath = new Path(outputDir, finalOutputName)
-    val tempOutputPath = new Path(outputDir, "." + finalOutputName + "-attempt-" + ctx.attemptId)
-
-    if (fs.exists(tempOutputPath)) {
-      throw new IOException("Checkpoint failed: temporary path " +
-        tempOutputPath + " already exists")
-    }
-    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-
-    val fileOutputStream = if (blockSize < 0) {
-      fs.create(tempOutputPath, false, bufferSize)
-    } else {
-      // This is mainly for testing purpose
-      fs.create(tempOutputPath, false, bufferSize, fs.getDefaultReplication, blockSize)
-    }
-    val serializer = env.serializer.newInstance()
-    val serializeStream = serializer.serializeStream(fileOutputStream)
-    serializeStream.writeAll(iterator)
-    serializeStream.close()
-
-    if (!fs.rename(tempOutputPath, finalOutputPath)) {
-      if (!fs.exists(finalOutputPath)) {
-        logInfo("Deleting tempOutputPath " + tempOutputPath)
-        fs.delete(tempOutputPath, false)
-        throw new IOException("Checkpoint failed: failed to save output of task: "
-          + ctx.attemptId + " and final output path does not exist")
-      } else {
-        // Some other copy of this task must've finished before us and renamed it
-        logInfo("Final output path " + finalOutputPath + " already exists; not overwriting it")
-        fs.delete(tempOutputPath, false)
-      }
-    }
-  }
-
-  def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
-    val env = SparkEnv.get
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
-    val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
-    val fileInputStream = fs.open(path, bufferSize)
-    val serializer = env.serializer.newInstance()
-    val deserializeStream = serializer.deserializeStream(fileInputStream)
-
-    // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback(() => deserializeStream.close())
-
-    deserializeStream.asIterator.asInstanceOf[Iterator[T]]
-  }
-
-  // Test whether CheckpointRDD generate expected number of partitions despite
-  // each split file having multiple blocks. This needs to be run on a
-  // cluster (mesos or standalone) using HDFS.
-  def main(args: Array[String]) {
-    import spark._
-
-    val Array(cluster, hdfsPath) = args
-    val env = SparkEnv.get
-    val sc = new SparkContext(cluster, "CheckpointRDD Test")
-    val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000)
-    val path = new Path(hdfsPath, "temp")
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
-    sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _)
-    val cpRDD = new CheckpointRDD[Int](sc, path.toString)
-    assert(cpRDD.partitions.length == rdd.partitions.length, "Number of partitions is not the same")
-    assert(cpRDD.collect.toList == rdd.collect.toList, "Data of partitions not the same")
-    fs.delete(path, true)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
deleted file mode 100644
index 01b6c23..0000000
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.{ObjectOutputStream, IOException}
-import java.util.{HashMap => JHashMap}
-
-import scala.collection.JavaConversions
-import scala.collection.mutable.ArrayBuffer
-
-import spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
-import spark.{Dependency, OneToOneDependency, ShuffleDependency}
-
-
-private[spark] sealed trait CoGroupSplitDep extends Serializable
-
-private[spark] case class NarrowCoGroupSplitDep(
-    rdd: RDD[_],
-    splitIndex: Int,
-    var split: Partition
-  ) extends CoGroupSplitDep {
-
-  @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
-    // Update the reference to parent split at the time of task serialization
-    split = rdd.partitions(splitIndex)
-    oos.defaultWriteObject()
-  }
-}
-
-private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
-
-private[spark]
-class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
-  extends Partition with Serializable {
-  override val index: Int = idx
-  override def hashCode(): Int = idx
-}
-
-
-/**
- * A RDD that cogroups its parents. For each key k in parent RDDs, the resulting RDD contains a
- * tuple with the list of values for that key.
- *
- * @param rdds parent RDDs.
- * @param part partitioner used to partition the shuffle output.
- */
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
-  extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
-
-  private var serializerClass: String = null
-
-  def setSerializer(cls: String): CoGroupedRDD[K] = {
-    serializerClass = cls
-    this
-  }
-
-  override def getDependencies: Seq[Dependency[_]] = {
-    rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
-      if (rdd.partitioner == Some(part)) {
-        logDebug("Adding one-to-one dependency with " + rdd)
-        new OneToOneDependency(rdd)
-      } else {
-        logDebug("Adding shuffle dependency with " + rdd)
-        new ShuffleDependency[Any, Any](rdd, part, serializerClass)
-      }
-    }
-  }
-
-  override def getPartitions: Array[Partition] = {
-    val array = new Array[Partition](part.numPartitions)
-    for (i <- 0 until array.size) {
-      // Each CoGroupPartition will have a dependency per contributing RDD
-      array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
-        // Assume each RDD contributed a single dependency, and get it
-        dependencies(j) match {
-          case s: ShuffleDependency[_, _] =>
-            new ShuffleCoGroupSplitDep(s.shuffleId)
-          case _ =>
-            new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i))
-        }
-      }.toArray)
-    }
-    array
-  }
-
-  override val partitioner = Some(part)
-
-  override def compute(s: Partition, context: TaskContext): Iterator[(K, Seq[Seq[_]])] = {
-    val split = s.asInstanceOf[CoGroupPartition]
-    val numRdds = split.deps.size
-    // e.g. for `(k, a) cogroup (k, b)`, K -> Seq(ArrayBuffer as, ArrayBuffer bs)
-    val map = new JHashMap[K, Seq[ArrayBuffer[Any]]]
-
-    def getSeq(k: K): Seq[ArrayBuffer[Any]] = {
-      val seq = map.get(k)
-      if (seq != null) {
-        seq
-      } else {
-        val seq = Array.fill(numRdds)(new ArrayBuffer[Any])
-        map.put(k, seq)
-        seq
-      }
-    }
-
-    val ser = SparkEnv.get.serializerManager.get(serializerClass)
-    for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
-      case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
-        // Read them from the parent
-        rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
-          getSeq(kv._1)(depNum) += kv._2
-        }
-      }
-      case ShuffleCoGroupSplitDep(shuffleId) => {
-        // Read map outputs of shuffle
-        val fetcher = SparkEnv.get.shuffleFetcher
-        fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
-          kv => getSeq(kv._1)(depNum) += kv._2
-        }
-      }
-    }
-    JavaConversions.mapAsScalaMap(map).iterator
-  }
-
-  override def clearDependencies() {
-    super.clearDependencies()
-    rdds = null
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
deleted file mode 100644
index e612d02..0000000
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark._
-import java.io.{ObjectOutputStream, IOException}
-import scala.collection.mutable
-import scala.Some
-import scala.collection.mutable.ArrayBuffer
-
-/**
- * Class that captures a coalesced RDD by essentially keeping track of parent partitions
- * @param index of this coalesced partition
- * @param rdd which it belongs to
- * @param parentsIndices list of indices in the parent that have been coalesced into this partition
- * @param preferredLocation the preferred location for this partition
- */
-case class CoalescedRDDPartition(
-                                  index: Int,
-                                  @transient rdd: RDD[_],
-                                  parentsIndices: Array[Int],
-                                  @transient preferredLocation: String = ""
-                                  ) extends Partition {
-  var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))
-
-  @throws(classOf[IOException])
-  private def writeObject(oos: ObjectOutputStream) {
-    // Update the reference to parent partition at the time of task serialization
-    parents = parentsIndices.map(rdd.partitions(_))
-    oos.defaultWriteObject()
-  }
-
-  /**
-   * Computes how many of the parents partitions have getPreferredLocation
-   * as one of their preferredLocations
-   * @return locality of this coalesced partition between 0 and 1
-   */
-  def localFraction: Double = {
-    val loc = parents.count(p =>
-      rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))
-
-    if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
-  }
-}
-
-/**
- * Represents a coalesced RDD that has fewer partitions than its parent RDD
- * This class uses the PartitionCoalescer class to find a good partitioning of the parent RDD
- * so that each new partition has roughly the same number of parent partitions and that
- * the preferred location of each new partition overlaps with as many preferred locations of its
- * parent partitions
- * @param prev RDD to be coalesced
- * @param maxPartitions number of desired partitions in the coalesced RDD
- * @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
- */
-class CoalescedRDD[T: ClassManifest](
-                                      @transient var prev: RDD[T],
-                                      maxPartitions: Int,
-                                      balanceSlack: Double = 0.10)
-  extends RDD[T](prev.context, Nil) {  // Nil since we implement getDependencies
-
-  override def getPartitions: Array[Partition] = {
-    val pc = new PartitionCoalescer(maxPartitions, prev, balanceSlack)
-
-    pc.run().zipWithIndex.map {
-      case (pg, i) =>
-        val ids = pg.arr.map(_.index).toArray
-        new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
-    }
-  }
-
-  override def compute(partition: Partition, context: TaskContext): Iterator[T] = {
-    partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>
-      firstParent[T].iterator(parentPartition, context)
-    }
-  }
-
-  override def getDependencies: Seq[Dependency[_]] = {
-    Seq(new NarrowDependency(prev) {
-      def getParents(id: Int): Seq[Int] =
-        partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices
-    })
-  }
-
-  override def clearDependencies() {
-    super.clearDependencies()
-    prev = null
-  }
-
-  /**
-   * Returns the preferred machine for the partition. If split is of type CoalescedRDDPartition,
-   * then the preferred machine will be one which most parent splits prefer too.
-   * @param partition
-   * @return the machine most preferred by split
-   */
-  override def getPreferredLocations(partition: Partition): Seq[String] = {
-    List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
-  }
-}
-
-/**
- * Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
- * this RDD computes one or more of the parent ones. It will produce exactly `maxPartitions` if the
- * parent had more than maxPartitions, or fewer if the parent had fewer.
- *
- * This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
- * or to avoid having a large number of small tasks when processing a directory with many files.
- *
- * If there is no locality information (no preferredLocations) in the parent, then the coalescing
- * is very simple: chunk parents that are close in the Array in chunks.
- * If there is locality information, it proceeds to pack them with the following four goals:
- *
- * (1) Balance the groups so they roughly have the same number of parent partitions
- * (2) Achieve locality per partition, i.e. find one machine which most parent partitions prefer
- * (3) Be efficient, i.e. O(n) algorithm for n parent partitions (problem is likely NP-hard)
- * (4) Balance preferred machines, i.e. avoid as much as possible picking the same preferred machine
- *
- * Furthermore, it is assumed that the parent RDD may have many partitions, e.g. 100 000.
- * We assume the final number of desired partitions is small, e.g. less than 1000.
- *
- * The algorithm tries to assign unique preferred machines to each partition. If the number of
- * desired partitions is greater than the number of preferred machines (can happen), it needs to
- * start picking duplicate preferred machines. This is determined using coupon collector estimation
- * (2n log(n)). The load balancing is done using power-of-two randomized bins-balls with one twist:
- * it tries to also achieve locality. This is done by allowing a slack (balanceSlack) between two
- * bins. If two bins are within the slack in terms of balance, the algorithm will assign partitions
- * according to locality. (contact alig for questions)
- *
- */
-
-private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
-
-  def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
-  def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
-    if (o1 == None) false else if (o2 == None) true else compare(o1.get, o2.get)
-
-  val rnd = new scala.util.Random(7919) // keep this class deterministic
-
-  // each element of groupArr represents one coalesced partition
-  val groupArr = ArrayBuffer[PartitionGroup]()
-
-  // hash used to check whether some machine is already in groupArr
-  val groupHash = mutable.Map[String, ArrayBuffer[PartitionGroup]]()
-
-  // hash used for the first maxPartitions (to avoid duplicates)
-  val initialHash = mutable.Set[Partition]()
-
-  // determines the tradeoff between load-balancing the partitions sizes and their locality
-  // e.g. balanceSlack=0.10 means that it allows up to 10% imbalance in favor of locality
-  val slack = (balanceSlack * prev.partitions.size).toInt
-
-  var noLocality = true  // if true if no preferredLocations exists for parent RDD
-
-  // gets the *current* preferred locations from the DAGScheduler (as opposed to the static ones)
-  def currPrefLocs(part: Partition): Seq[String] = {
-    prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
-  }
-
-  // this class just keeps iterating and rotating infinitely over the partitions of the RDD
-  // next() returns the next preferred machine that a partition is replicated on
-  // the rotator first goes through the first replica copy of each partition, then second, third
-  // the iterators return type is a tuple: (replicaString, partition)
-  class LocationIterator(prev: RDD[_]) extends Iterator[(String, Partition)] {
-
-    var it: Iterator[(String, Partition)] = resetIterator()
-
-    override val isEmpty = !it.hasNext
-
-    // initializes/resets to start iterating from the beginning
-    def resetIterator() = {
-      val iterators = (0 to 2).map( x =>
-        prev.partitions.iterator.flatMap(p => {
-          if (currPrefLocs(p).size > x) Some((currPrefLocs(p)(x), p)) else None
-        } )
-      )
-      iterators.reduceLeft((x, y) => x ++ y)
-    }
-
-    // hasNext() is false iff there are no preferredLocations for any of the partitions of the RDD
-    def hasNext(): Boolean = { !isEmpty }
-
-    // return the next preferredLocation of some partition of the RDD
-    def next(): (String, Partition) = {
-      if (it.hasNext)
-        it.next()
-      else {
-        it = resetIterator() // ran out of preferred locations, reset and rotate to the beginning
-        it.next()
-      }
-    }
-  }
-
-  /**
-   * Sorts and gets the least element of the list associated with key in groupHash
-   * The returned PartitionGroup is the least loaded of all groups that represent the machine "key"
-   * @param key string representing a partitioned group on preferred machine key
-   * @return Option of PartitionGroup that has least elements for key
-   */
-  def getLeastGroupHash(key: String): Option[PartitionGroup] = {
-    groupHash.get(key).map(_.sortWith(compare).head)
-  }
-
-  def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
-    if (!initialHash.contains(part)) {
-      pgroup.arr += part           // already assign this element
-      initialHash += part // needed to avoid assigning partitions to multiple buckets
-      true
-    } else { false }
-  }
-
-  /**
-   * Initializes targetLen partition groups and assigns a preferredLocation
-   * This uses coupon collector to estimate how many preferredLocations it must rotate through
-   * until it has seen most of the preferred locations (2 * n log(n))
-   * @param targetLen
-   */
-  def setupGroups(targetLen: Int) {
-    val rotIt = new LocationIterator(prev)
-
-    // deal with empty case, just create targetLen partition groups with no preferred location
-    if (!rotIt.hasNext()) {
-      (1 to targetLen).foreach(x => groupArr += PartitionGroup())
-      return
-    }
-
-    noLocality = false
-
-    // number of iterations needed to be certain that we've seen most preferred locations
-    val expectedCoupons2 = 2 * (math.log(targetLen)*targetLen + targetLen + 0.5).toInt
-    var numCreated = 0
-    var tries = 0
-
-    // rotate through until either targetLen unique/distinct preferred locations have been created
-    // OR we've rotated expectedCoupons2, in which case we have likely seen all preferred locations,
-    // i.e. likely targetLen >> number of preferred locations (more buckets than there are machines)
-    while (numCreated < targetLen && tries < expectedCoupons2) {
-      tries += 1
-      val (nxt_replica, nxt_part) = rotIt.next()
-      if (!groupHash.contains(nxt_replica)) {
-        val pgroup = PartitionGroup(nxt_replica)
-        groupArr += pgroup
-        addPartToPGroup(nxt_part, pgroup)
-        groupHash += (nxt_replica -> (ArrayBuffer(pgroup))) // list in case we have multiple
-        numCreated += 1
-      }
-    }
-
-    while (numCreated < targetLen) {  // if we don't have enough partition groups, create duplicates
-      var (nxt_replica, nxt_part) = rotIt.next()
-      val pgroup = PartitionGroup(nxt_replica)
-      groupArr += pgroup
-      groupHash.get(nxt_replica).get += pgroup
-      var tries = 0
-      while (!addPartToPGroup(nxt_part, pgroup) && tries < targetLen) { // ensure at least one part
-        nxt_part = rotIt.next()._2
-        tries += 1
-      }
-      numCreated += 1
-    }
-
-  }
-
-  /**
-   * Takes a parent RDD partition and decides which of the partition groups to put it in
-   * Takes locality into account, but also uses power of 2 choices to load balance
-   * It strikes a balance between the two use the balanceSlack variable
-   * @param p partition (ball to be thrown)
-   * @return partition group (bin to be put in)
-   */
-  def pickBin(p: Partition): PartitionGroup = {
-    val pref = currPrefLocs(p).map(getLeastGroupHash(_)).sortWith(compare) // least loaded pref locs
-    val prefPart = if (pref == Nil) None else pref.head
-
-    val r1 = rnd.nextInt(groupArr.size)
-    val r2 = rnd.nextInt(groupArr.size)
-    val minPowerOfTwo = if (groupArr(r1).size < groupArr(r2).size) groupArr(r1) else groupArr(r2)
-    if (prefPart== None) // if no preferred locations, just use basic power of two
-      return minPowerOfTwo
-
-    val prefPartActual = prefPart.get
-
-    if (minPowerOfTwo.size + slack <= prefPartActual.size)  // more imbalance than the slack allows
-      return minPowerOfTwo  // prefer balance over locality
-    else {
-      return prefPartActual // prefer locality over balance
-    }
-  }
-
-  def throwBalls() {
-    if (noLocality) {  // no preferredLocations in parent RDD, no randomization needed
-      if (maxPartitions > groupArr.size) { // just return prev.partitions
-        for ((p,i) <- prev.partitions.zipWithIndex) {
-          groupArr(i).arr += p
-        }
-      } else { // no locality available, then simply split partitions based on positions in array
-        for(i <- 0 until maxPartitions) {
-          val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt
-          val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt
-          (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) }
-        }
-      }
-    } else {
-      for (p <- prev.partitions if (!initialHash.contains(p))) { // throw every partition into group
-        pickBin(p).arr += p
-      }
-    }
-  }
-
-  def getPartitions: Array[PartitionGroup] = groupArr.filter( pg => pg.size > 0).toArray
-
-  /**
-   * Runs the packing algorithm and returns an array of PartitionGroups that if possible are
-   * load balanced and grouped by locality
-   * @return array of partition groups
-   */
-  def run(): Array[PartitionGroup] = {
-    setupGroups(math.min(prev.partitions.length, maxPartitions))   // setup the groups (bins)
-    throwBalls()             // assign partitions (balls) to each group (bins)
-    getPartitions
-  }
-}
-
-private[spark] case class PartitionGroup(prefLoc: String = "") {
-  var arr = mutable.ArrayBuffer[Partition]()
-
-  def size = arr.size
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/EmptyRDD.scala b/core/src/main/scala/spark/rdd/EmptyRDD.scala
deleted file mode 100644
index d7d4db5..0000000
--- a/core/src/main/scala/spark/rdd/EmptyRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
-
-
-/**
- * An RDD that is empty, i.e. has no element in it.
- */
-class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
-
-  override def getPartitions: Array[Partition] = Array.empty
-
-  override def compute(split: Partition, context: TaskContext): Iterator[T] = {
-    throw new UnsupportedOperationException("empty RDD")
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
deleted file mode 100644
index 783508c..0000000
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{OneToOneDependency, RDD, Partition, TaskContext}
-
-private[spark] class FilteredRDD[T: ClassManifest](
-    prev: RDD[T],
-    f: T => Boolean)
-  extends RDD[T](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override val partitioner = prev.partitioner    // Since filter cannot change a partition's keys
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).filter(f)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
deleted file mode 100644
index ed75eac..0000000
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-private[spark]
-class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
-    prev: RDD[T],
-    f: T => TraversableOnce[U])
-  extends RDD[U](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).flatMap(f)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
deleted file mode 100644
index a6bdce8..0000000
--- a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{TaskContext, Partition, RDD}
-
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
-  extends RDD[(K, U)](prev) {
-
-  override def getPartitions = firstParent[Product2[K, V]].partitions
-
-  override val partitioner = firstParent[Product2[K, V]].partitioner
-
-  override def compute(split: Partition, context: TaskContext) = {
-    firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
-      f(v).map(x => (k, x))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
deleted file mode 100644
index 1573f8a..0000000
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
-  extends RDD[Array[T]](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    Array(firstParent[T].iterator(split, context).toArray).iterator
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
deleted file mode 100644
index e512423..0000000
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.io.EOFException
-import java.util.NoSuchElementException
-
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.InputSplit
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapred.RecordReader
-import org.apache.hadoop.mapred.Reporter
-import org.apache.hadoop.util.ReflectionUtils
-
-import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
-import spark.util.NextIterator
-import org.apache.hadoop.conf.{Configuration, Configurable}
-
-
-/**
- * A Spark split class that wraps around a Hadoop InputSplit.
- */
-private[spark] class HadoopPartition(rddId: Int, idx: Int, @transient s: InputSplit)
-  extends Partition {
-  
-  val inputSplit = new SerializableWritable[InputSplit](s)
-
-  override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt
-
-  override val index: Int = idx
-}
-
-/**
- * An RDD that reads a Hadoop dataset as specified by a JobConf (e.g. files in HDFS, the local file
- * system, or S3, tables in HBase, etc).
- */
-class HadoopRDD[K, V](
-    sc: SparkContext,
-    @transient conf: JobConf,
-    inputFormatClass: Class[_ <: InputFormat[K, V]],
-    keyClass: Class[K],
-    valueClass: Class[V],
-    minSplits: Int)
-  extends RDD[(K, V)](sc, Nil) with Logging {
-
-  // A Hadoop JobConf can be about 10 KB, which is pretty big, so broadcast it
-  private val confBroadcast = sc.broadcast(new SerializableWritable(conf))
-
-  override def getPartitions: Array[Partition] = {
-    val env = SparkEnv.get
-    env.hadoop.addCredentials(conf)
-    val inputFormat = createInputFormat(conf)
-    if (inputFormat.isInstanceOf[Configurable]) {
-      inputFormat.asInstanceOf[Configurable].setConf(conf)
-    }
-    val inputSplits = inputFormat.getSplits(conf, minSplits)
-    val array = new Array[Partition](inputSplits.size)
-    for (i <- 0 until inputSplits.size) {
-      array(i) = new HadoopPartition(id, i, inputSplits(i))
-    }
-    array
-  }
-
-  def createInputFormat(conf: JobConf): InputFormat[K, V] = {
-    ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf)
-      .asInstanceOf[InputFormat[K, V]]
-  }
-
-  override def compute(theSplit: Partition, context: TaskContext) = new NextIterator[(K, V)] {
-    val split = theSplit.asInstanceOf[HadoopPartition]
-    logInfo("Input split: " + split.inputSplit)
-    var reader: RecordReader[K, V] = null
-
-    val conf = confBroadcast.value.value
-    val fmt = createInputFormat(conf)
-    if (fmt.isInstanceOf[Configurable]) {
-      fmt.asInstanceOf[Configurable].setConf(conf)
-    }
-    reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
-
-    // Register an on-task-completion callback to close the input stream.
-    context.addOnCompleteCallback{ () => closeIfNeeded() }
-
-    val key: K = reader.createKey()
-    val value: V = reader.createValue()
-
-    override def getNext() = {
-      try {
-        finished = !reader.next(key, value)
-      } catch {
-        case eof: EOFException =>
-          finished = true
-      }
-      (key, value)
-    }
-
-    override def close() {
-      try {
-        reader.close()
-      } catch {
-        case e: Exception => logWarning("Exception in RecordReader.close()", e)
-      }
-    }
-  }
-
-  override def getPreferredLocations(split: Partition): Seq[String] = {
-    // TODO: Filtering out "localhost" in case of file:// URLs
-    val hadoopSplit = split.asInstanceOf[HadoopPartition]
-    hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
-  }
-
-  override def checkpoint() {
-    // Do nothing. Hadoop RDD should not be checkpointed.
-  }
-
-  def getConf: Configuration = confBroadcast.value.value
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/JdbcRDD.scala b/core/src/main/scala/spark/rdd/JdbcRDD.scala
deleted file mode 100644
index 5913243..0000000
--- a/core/src/main/scala/spark/rdd/JdbcRDD.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import java.sql.{Connection, ResultSet}
-
-import spark.{Logging, Partition, RDD, SparkContext, TaskContext}
-import spark.util.NextIterator
-
-private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
-  override def index = idx
-}
-
-/**
- * An RDD that executes an SQL query on a JDBC connection and reads results.
- * For usage example, see test case JdbcRDDSuite.
- *
- * @param getConnection a function that returns an open Connection.
- *   The RDD takes care of closing the connection.
- * @param sql the text of the query.
- *   The query must contain two ? placeholders for parameters used to partition the results.
- *   E.g. "select title, author from books where ? <= id and id <= ?"
- * @param lowerBound the minimum value of the first placeholder
- * @param upperBound the maximum value of the second placeholder
- *   The lower and upper bounds are inclusive.
- * @param numPartitions the number of partitions.
- *   Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2,
- *   the query would be executed twice, once with (1, 10) and once with (11, 20)
- * @param mapRow a function from a ResultSet to a single row of the desired result type(s).
- *   This should only call getInt, getString, etc; the RDD takes care of calling next.
- *   The default maps a ResultSet to an array of Object.
- */
-class JdbcRDD[T: ClassManifest](
-    sc: SparkContext,
-    getConnection: () => Connection,
-    sql: String,
-    lowerBound: Long,
-    upperBound: Long,
-    numPartitions: Int,
-    mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
-  extends RDD[T](sc, Nil) with Logging {
-
-  override def getPartitions: Array[Partition] = {
-    // bounds are inclusive, hence the + 1 here and - 1 on end
-    val length = 1 + upperBound - lowerBound
-    (0 until numPartitions).map(i => {
-      val start = lowerBound + ((i * length) / numPartitions).toLong
-      val end = lowerBound + (((i + 1) * length) / numPartitions).toLong - 1
-      new JdbcPartition(i, start, end)
-    }).toArray
-  }
-
-  override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
-    context.addOnCompleteCallback{ () => closeIfNeeded() }
-    val part = thePart.asInstanceOf[JdbcPartition]
-    val conn = getConnection()
-    val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
-
-    // setFetchSize(Integer.MIN_VALUE) is a mysql driver specific way to force streaming results,
-    // rather than pulling entire resultset into memory.
-    // see http://dev.mysql.com/doc/refman/5.0/en/connector-j-reference-implementation-notes.html
-    if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
-      stmt.setFetchSize(Integer.MIN_VALUE)
-      logInfo("statement fetch size set to: " + stmt.getFetchSize + " to force MySQL streaming ")
-    }
-
-    stmt.setLong(1, part.lower)
-    stmt.setLong(2, part.upper)
-    val rs = stmt.executeQuery()
-
-    override def getNext: T = {
-      if (rs.next()) {
-        mapRow(rs)
-      } else {
-        finished = true
-        null.asInstanceOf[T]
-      }
-    }
-
-    override def close() {
-      try {
-        if (null != rs && ! rs.isClosed()) rs.close()
-      } catch {
-        case e: Exception => logWarning("Exception closing resultset", e)
-      }
-      try {
-        if (null != stmt && ! stmt.isClosed()) stmt.close()
-      } catch {
-        case e: Exception => logWarning("Exception closing statement", e)
-      }
-      try {
-        if (null != conn && ! stmt.isClosed()) conn.close()
-        logInfo("closed connection")
-      } catch {
-        case e: Exception => logWarning("Exception closing connection", e)
-      }
-    }
-  }
-}
-
-object JdbcRDD {
-  def resultSetToObjectArray(rs: ResultSet) = {
-    Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
deleted file mode 100644
index af8f0a1..0000000
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-private[spark]
-class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
-    prev: RDD[T],
-    f: Iterator[T] => Iterator[U],
-    preservesPartitioning: Boolean = false)
-  extends RDD[U](prev) {
-
-  override val partitioner =
-    if (preservesPartitioning) firstParent[T].partitioner else None
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    f(firstParent[T].iterator(split, context))
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
deleted file mode 100644
index 3b4e951..0000000
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithIndexRDD.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-
-/**
- * A variant of the MapPartitionsRDD that passes the partition index into the
- * closure. This can be used to generate or collect partition specific
- * information such as the number of tuples in a partition.
- */
-private[spark]
-class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
-    prev: RDD[T],
-    f: (Int, Iterator[T]) => Iterator[U],
-    preservesPartitioning: Boolean
-  ) extends RDD[U](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override val partitioner = if (preservesPartitioning) prev.partitioner else None
-
-  override def compute(split: Partition, context: TaskContext) =
-    f(split.index, firstParent[T].iterator(split, context))
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
deleted file mode 100644
index 8b411dd..0000000
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-import spark.{RDD, Partition, TaskContext}
-
-private[spark]
-class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
-  extends RDD[U](prev) {
-
-  override def getPartitions: Array[Partition] = firstParent[T].partitions
-
-  override def compute(split: Partition, context: TaskContext) =
-    firstParent[T].iterator(split, context).map(f)
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
deleted file mode 100644
index 8334e3b..0000000
--- a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark.rdd
-
-
-import spark.{TaskContext, Partition, RDD}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
-  extends RDD[(K, U)](prev) {
-
-  override def getPartitions = firstParent[Product2[K, U]].partitions
-
-  override val partitioner = firstParent[Product2[K, U]].partitioner
-
-  override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
-    firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
-  }
-}