You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:47 UTC

[63/69] [abbrv] Move some classes to more appropriate packages:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index cb25ff7..5fd1fab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.api.java
 
-import org.apache.spark.RDD
+import org.apache.spark.rdd.RDD
 import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.util.StatCounter

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 09da35a..a6518ab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -33,12 +33,12 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.HashPartitioner
 import org.apache.spark.Partitioner
 import org.apache.spark.Partitioner._
-import org.apache.spark.RDD
 import org.apache.spark.SparkContext.rddToPairRDDFunctions
 import org.apache.spark.api.java.function.{Function2 => JFunction2}
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.partial.BoundedDouble
 import org.apache.spark.partial.PartialResult
+import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.OrderedRDDFunctions
 import org.apache.spark.storage.StorageLevel
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 68cfcf5..eec58ab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.api.java
 
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.function.{Function => JFunction}
 import org.apache.spark.storage.StorageLevel
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 1ad8514..7e6e691 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,13 +21,15 @@ import java.util.{List => JList, Comparator}
 import scala.Tuple2
 import scala.collection.JavaConversions._
 
+import com.google.common.base.Optional
 import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{SparkContext, Partition, RDD, TaskContext}
+
+import org.apache.spark.{SparkContext, Partition, TaskContext}
+import org.apache.spark.rdd.RDD
 import org.apache.spark.api.java.JavaPairRDD._
 import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
 import org.apache.spark.partial.{PartialResult, BoundedDouble}
 import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
 
 
 trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 618a7b3..8869e07 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -26,13 +26,13 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import com.google.common.base.Optional
 
-import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
+import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, SparkContext}
 import org.apache.spark.SparkContext.IntAccumulatorParam
 import org.apache.spark.SparkContext.DoubleAccumulatorParam
 import org.apache.spark.broadcast.Broadcast
-
-import com.google.common.base.Optional
+import org.apache.spark.rdd.RDD
 
 /**
  * A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index eea63d5..b090c6e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -18,8 +18,8 @@
 package org.apache.spark.api.python
 
 import org.apache.spark.Partitioner
-import org.apache.spark.Utils
 import java.util.Arrays
+import org.apache.spark.util.Utils
 
 /**
  * A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 621f0fe..ccd3833 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -26,7 +26,9 @@ import scala.collection.JavaConversions._
 import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PipedRDD
+import org.apache.spark.util.Utils
 
 
 private[spark] class PythonRDD[T: ClassManifest](

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index 99e8623..93e7815 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -27,6 +27,7 @@ import scala.math
 
 import org.apache.spark._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
   extends Broadcast[T](id)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 7a52ff0..9db26ae 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -23,10 +23,10 @@ import java.net.URL
 import it.unimi.dsi.fastutil.io.FastBufferedInputStream
 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 
-import org.apache.spark.{HttpServer, Logging, SparkEnv, Utils}
+import org.apache.spark.{HttpServer, Logging, SparkEnv}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashSet}
+import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
 
 
 private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
index 10b910d..21ec946 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
@@ -24,6 +24,7 @@ import java.util.Random
 import scala.collection.mutable.Map
 
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 private object MultiTracker
 extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index b5a4ccc..80c97ca 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -26,6 +26,7 @@ import scala.math
 
 import org.apache.spark._
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
 
 private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
 extends Broadcast[T](id) with Logging with Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4dc6ada..c31619d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -19,10 +19,10 @@ package org.apache.spark.deploy
 
 import scala.collection.immutable.List
 
-import org.apache.spark.Utils
 import org.apache.spark.deploy.ExecutorState.ExecutorState
 import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
 import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.util.Utils
 
 
 private[deploy] sealed trait DeployMessage extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index af5a411..78e3747 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -21,8 +21,8 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
 
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.AkkaUtils
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.{Logging}
 
 import scala.collection.mutable.ArrayBuffer
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 0322029..d5e9a0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.deploy.client
 
-import org.apache.spark.util.AkkaUtils
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.{Logging}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
 
 private[spark] object TestClient {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 869b2b2..7cf0a77 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -27,12 +27,12 @@ import akka.actor.Terminated
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
 import akka.util.duration._
 
-import org.apache.spark.{Logging, SparkException, Utils}
+import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
 
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index c86cca2..9d89b45 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -17,8 +17,7 @@
 
 package org.apache.spark.deploy.master
 
-import org.apache.spark.util.IntParam
-import org.apache.spark.Utils
+import org.apache.spark.util.{Utils, IntParam}
 
 /**
  * Command-line parser for the master.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 285e07a..6219f11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
 
 import akka.actor.ActorRef
 import scala.collection.mutable
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
 
 private[spark] class WorkerInfo(
   val id: String,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 6435c7f..f4e574d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste
 import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.master.ExecutorInfo
 import org.apache.spark.ui.UIUtils
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
 
 private[spark] class ApplicationPage(parent: MasterWebUI) {
   val master = parent.masterActorRef

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 58d3863..d7a5722 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -27,12 +27,12 @@ import akka.util.duration._
 
 import net.liftweb.json.JsonAST.JValue
 
-import org.apache.spark.Utils
 import org.apache.spark.deploy.DeployWebUI
 import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
 import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
 import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
 
 private[spark] class IndexPage(parent: MasterWebUI) {
   val master = parent.masterActorRef

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 47b1e52..f4df729 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -23,10 +23,11 @@ import javax.servlet.http.HttpServletRequest
 
 import org.eclipse.jetty.server.{Handler, Server}
 
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.{Logging}
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.ui.JettyUtils
 import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
 
 /**
  * Web UI server for the standalone master.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 01ce4a6..e3dc30e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,9 +25,10 @@ import akka.actor.ActorRef
 import com.google.common.base.Charsets
 import com.google.common.io.Files
 
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
 import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
 
 /**
  * Manages the execution of one executor process.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 86e8e75..09530be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,13 +27,13 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 import akka.util.duration._
 
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.{Logging}
 import org.apache.spark.deploy.ExecutorState
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.ui.WorkerWebUI
 import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
 
 
 private[spark] class Worker(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 6d91223..0ae89a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -17,9 +17,7 @@
 
 package org.apache.spark.deploy.worker
 
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.MemoryParam
-import org.apache.spark.Utils
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
 import java.lang.management.ManagementFactory
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 6192c23..d2d3617 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -27,11 +27,11 @@ import akka.util.duration._
 
 import net.liftweb.json.JsonAST.JValue
 
-import org.apache.spark.Utils
 import org.apache.spark.deploy.JsonProtocol
 import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
 import org.apache.spark.deploy.worker.ExecutorRunner
 import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
 
 
 private[spark] class IndexPage(parent: WorkerWebUI) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index bb8165a..95d6007 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -26,10 +26,11 @@ import javax.servlet.http.HttpServletRequest
 import org.eclipse.jetty.server.{Handler, Server}
 
 import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
 import org.apache.spark.ui.JettyUtils
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
 
 /**
  * Web UI server for the standalone worker.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5446a3f..d365804 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
 
 import org.apache.spark.scheduler._
 import org.apache.spark._
+import org.apache.spark.util.Utils
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 410a94d..da62091 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -22,8 +22,9 @@ import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNa
 import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _}
 import org.apache.spark.TaskState.TaskState
 import com.google.protobuf.ByteString
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
 import org.apache.spark.TaskState
+import org.apache.spark.util.Utils
 
 private[spark] class MesosExecutorBackend
   extends MesosExecutor

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 65801f7..7839023 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -22,10 +22,10 @@ import java.nio.ByteBuffer
 import akka.actor.{ActorRef, Actor, Props, Terminated}
 import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
 
-import org.apache.spark.{Logging, Utils, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
 
 
 private[spark] class StandaloneExecutorBackend(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 9e2233c..e15a839 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -34,6 +34,7 @@ import scala.collection.mutable.ArrayBuffer
 import akka.dispatch.{Await, Promise, ExecutionContext, Future}
 import akka.util.Duration
 import akka.util.duration._
+import org.apache.spark.util.Utils
 
 
 private[spark] class ConnectionManager(port: Int) extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
index 0839c01..50dd9bc 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network
 
 import java.net.InetSocketAddress
 
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
 
 
 private[spark] case class ConnectionManagerId(host: String, port: Int) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 1126480..c0ec527 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -1,3 +1,5 @@
+import org.apache.spark.rdd.{SequenceFileRDDFunctions, DoubleRDDFunctions, PairRDDFunctions}
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
@@ -16,16 +18,17 @@
  */
 
 /**
- * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to Spark, while
- * [[org.apache.spark.RDD]] is the data type representing a distributed collection, and provides most
- * parallel operations. 
+ * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
+ * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
+ * and provides most parallel operations.
  *
- * In addition, [[org.apache.spark.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[org.apache.spark.DoubleRDDFunctions]] contains operations
- * available only on RDDs of Doubles; and [[org.apache.spark.SequenceFileRDDFunctions]] contains operations
- * available on RDDs that can be saved as SequenceFiles. These operations are automatically
- * available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when
- * you `import org.apache.spark.SparkContext._`.
+ * In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs
+ * of key-value pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]]
+ * contains operations available only on RDDs of Doubles; and
+ * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
+ * be saved as SequenceFiles. These operations are automatically available on any RDD of the right
+ * type (e.g. RDD[(Int, Int)] through implicit conversions when you
+ * `import org.apache.spark.SparkContext._`.
  */
 package object spark { 
   // For package docs only

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
index c5d51be..d710694 100644
--- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.partial
 
 import org.apache.spark._
 import org.apache.spark.scheduler.JobListener
+import org.apache.spark.rdd.RDD
 
 /**
  * A JobListener for an approximate single-result action, such as count() or non-parallel reduce().

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 4bb01ef..bca6956 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
 import org.apache.spark.storage.BlockManager
 
 private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index dcc35e8..0187256 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConversions
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext}
 import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
new file mode 100644
index 0000000..a4bec41
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.partial.BoundedDouble
+import org.apache.spark.partial.MeanEvaluator
+import org.apache.spark.partial.PartialResult
+import org.apache.spark.partial.SumEvaluator
+import org.apache.spark.util.StatCounter
+import org.apache.spark.{TaskContext, Logging}
+
+/**
+ * Extra functions available on RDDs of Doubles through an implicit conversion.
+ * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
+ */
+class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
+  /** Add up the elements in this RDD. */
+  def sum(): Double = {
+    self.reduce(_ + _)
+  }
+
+  /**
+   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
+   * of the RDD's elements in one operation.
+   */
+  def stats(): StatCounter = {
+    self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
+  }
+
+  /** Compute the mean of this RDD's elements. */
+  def mean(): Double = stats().mean
+
+  /** Compute the variance of this RDD's elements. */
+  def variance(): Double = stats().variance
+
+  /** Compute the standard deviation of this RDD's elements. */
+  def stdev(): Double = stats().stdev
+
+  /** 
+   * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
+   * estimating the standard deviation by dividing by N-1 instead of N).
+   */
+  def sampleStdev(): Double = stats().sampleStdev
+
+  /**
+   * Compute the sample variance of this RDD's elements (which corrects for bias in
+   * estimating the variance by dividing by N-1 instead of N).
+   */
+  def sampleVariance(): Double = stats().sampleVariance
+
+  /** (Experimental) Approximate operation to return the mean within a timeout. */
+  def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
+    val evaluator = new MeanEvaluator(self.partitions.size, confidence)
+    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
+  }
+
+  /** (Experimental) Approximate operation to return the sum within a timeout. */
+  def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
+    val evaluator = new SumEvaluator(self.partitions.size, confidence)
+    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index 24ce4ab..c8900d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
index 4df8ceb..5312dc0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{OneToOneDependency, RDD, Partition, TaskContext}
+import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
 
 private[spark] class FilteredRDD[T: ClassManifest](
     prev: RDD[T],

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
index 2bf7653..cbdf6d8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
index e544720..82000ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{TaskContext, Partition, RDD}
+import org.apache.spark.{TaskContext, Partition}
 
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
index 2ce9419..829545d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
   extends RDD[Array[T]](prev) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08e6154..2cb6734 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -18,21 +18,15 @@
 package org.apache.spark.rdd
 
 import java.io.EOFException
-import java.util.NoSuchElementException
 
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.FileInputFormat
 import org.apache.hadoop.mapred.InputFormat
 import org.apache.hadoop.mapred.InputSplit
 import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.TextInputFormat
 import org.apache.hadoop.mapred.RecordReader
 import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.ReflectionUtils
 
-import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
 import org.apache.spark.util.NextIterator
 import org.apache.hadoop.conf.{Configuration, Configurable}
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 3db460b..aca0146 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
 
 import java.sql.{Connection, ResultSet}
 
-import org.apache.spark.{Logging, Partition, RDD, SparkContext, TaskContext}
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
 import org.apache.spark.util.NextIterator
 
 private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 13009d3..203179c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
index 1683050..3ed8339 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
index 26d4806..e8be1c4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 private[spark]
 class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
index a405e9a..d33c1af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.rdd
 
 
-import org.apache.spark.{TaskContext, Partition, RDD}
+import org.apache.spark.{TaskContext, Partition}
 
 private[spark]
 class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 114b504..7b3a89f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
 import org.apache.hadoop.io.Writable
 import org.apache.hadoop.mapreduce._
 
-import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
 
 
 private[spark]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 4c3df0e..697be8b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{RangePartitioner, Logging, RDD}
+import org.apache.spark.{RangePartitioner, Logging}
 
 /**
  * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
+ * use these functions. They will work with any key type that has a `scala.math.Ordered`
+ * implementation.
  */
 class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
                           V: ClassManifest,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
new file mode 100644
index 0000000..a47c512
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.nio.ByteBuffer
+import java.util.Date
+import java.text.SimpleDateFormat
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.{mutable, Map}
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.SequenceFile.CompressionType
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
+import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.Aggregator
+import org.apache.spark.Partitioner
+import org.apache.spark.Partitioner.defaultPartitioner
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
+ */
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+  extends Logging
+  with SparkHadoopMapReduceUtil
+  with Serializable {
+
+  /**
+   * Generic function to combine the elements for each key using a custom set of aggregation
+   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
+   * Note that V and C can be different -- for example, one might group an RDD of type
+   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
+   *
+   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+   * - `mergeCombiners`, to combine two C's into a single one.
+   *
+   * In addition, users can control the partitioning of the output RDD, and whether to perform
+   * map-side aggregation (if a mapper can produce multiple items with the same key).
+   */
+  def combineByKey[C](createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      partitioner: Partitioner,
+      mapSideCombine: Boolean = true,
+      serializerClass: String = null): RDD[(K, C)] = {
+    if (getKeyClass().isArray) {
+      if (mapSideCombine) {
+        throw new SparkException("Cannot use map-side combining with array keys.")
+      }
+      if (partitioner.isInstanceOf[HashPartitioner]) {
+        throw new SparkException("Default partitioner cannot partition array keys.")
+      }
+    }
+    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+    if (self.partitioner == Some(partitioner)) {
+      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+    } else if (mapSideCombine) {
+      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
+        .setSerializer(serializerClass)
+      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
+    } else {
+      // Don't apply map-side combiner.
+      // A sanity check to make sure mergeCombiners is not defined.
+      assert(mergeCombiners == null)
+      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+    }
+  }
+
+  /**
+   * Simplified version of combineByKey that hash-partitions the output RDD.
+   */
+  def combineByKey[C](createCombiner: V => C,
+      mergeValue: (C, V) => C,
+      mergeCombiners: (C, C) => C,
+      numPartitions: Int): RDD[(K, C)] = {
+    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
+    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+    val zeroArray = new Array[Byte](zeroBuffer.limit)
+    zeroBuffer.get(zeroArray)
+
+    // When deserializing, use a lazy val to create just one instance of the serializer per task
+    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+
+    combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
+  }
+
+  /**
+   * Merge the values for each key using an associative function and a neutral "zero value" which may
+   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+   * list concatenation, 0 for addition, or 1 for multiplication.).
+   */
+  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+    foldByKey(zeroValue, defaultPartitioner(self))(func)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce.
+   */
+  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+    combineByKey[V]((v: V) => v, func, func, partitioner)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function, but return the results
+   * immediately to the master as a Map. This will also perform the merging locally on each mapper
+   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+   */
+  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+
+    if (getKeyClass().isArray) {
+      throw new SparkException("reduceByKeyLocally() does not support array keys")
+    }
+
+    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+      val map = new JHashMap[K, V]
+      iter.foreach { case (k, v) =>
+        val old = map.get(k)
+        map.put(k, if (old == null) v else func(old, v))
+      }
+      Iterator(map)
+    }
+
+    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+      m2.foreach { case (k, v) =>
+        val old = m1.get(k)
+        m1.put(k, if (old == null) v else func(old, v))
+      }
+      m1
+    }
+
+    self.mapPartitions(reducePartition).reduce(mergeMaps)
+  }
+
+  /** Alias for reduceByKeyLocally */
+  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
+
+  /** Count the number of elements for each key, and return the result to the master as a Map. */
+  def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
+
+  /**
+   * (Experimental) Approximate version of countByKey that can return a partial result if it does
+   * not finish within a timeout.
+   */
+  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
+      : PartialResult[Map[K, BoundedDouble]] = {
+    self.map(_._1).countByValueApprox(timeout, confidence)
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
+   */
+  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
+    reduceByKey(new HashPartitioner(numPartitions), func)
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Allows controlling the
+   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
+   */
+  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+    // groupByKey shouldn't use map side combine because map side combine does not
+    // reduce the amount of data shuffled and requires all map side data be inserted
+    // into a hash table, leading to more objects in the old gen.
+    def createCombiner(v: V) = ArrayBuffer(v)
+    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+    val bufs = combineByKey[ArrayBuffer[V]](
+      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
+    bufs.asInstanceOf[RDD[(K, Seq[V])]]
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+   * resulting RDD with into `numPartitions` partitions.
+   */
+  def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+    groupByKey(new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return a copy of the RDD partitioned using the specified partitioner.
+   */
+  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    new ShuffledRDD[K, V, (K, V)](self, partitioner)
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
+   */
+  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+    }
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
+   * partition the output RDD.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (ws.isEmpty) {
+        vs.iterator.map(v => (v, None))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+      }
+    }
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
+   * partition the output RDD.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+      : RDD[(K, (Option[V], W))] = {
+    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+      if (vs.isEmpty) {
+        ws.iterator.map(w => (None, w))
+      } else {
+        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+      }
+    }
+  }
+
+  /**
+   * Simplified version of combineByKey that hash-partitions the resulting RDD using the
+   * existing partitioner/parallelism level.
+   */
+  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+      : RDD[(K, C)] = {
+    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
+  }
+
+  /**
+   * Merge the values for each key using an associative reduce function. This will also perform
+   * the merging locally on each mapper before sending results to a reducer, similarly to a
+   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+   * parallelism level.
+   */
+  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+    reduceByKey(defaultPartitioner(self), func)
+  }
+
+  /**
+   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+   * resulting RDD with the existing partitioner/parallelism level.
+   */
+  def groupByKey(): RDD[(K, Seq[V])] = {
+    groupByKey(defaultPartitioner(self))
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   */
+  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+    join(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+   * (k, v2) is in `other`. Performs a hash join across the cluster.
+   */
+  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+    join(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+   * using the existing partitioner/parallelism level.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+    leftOuterJoin(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+   * into `numPartitions` partitions.
+   */
+  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+    leftOuterJoin(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+   * RDD using the existing partitioner/parallelism level.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+    rightOuterJoin(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+   * RDD into the given number of partitions.
+   */
+  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+    rightOuterJoin(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * Return the key-value pairs in this RDD to the master as a Map.
+   */
+  def collectAsMap(): Map[K, V] = {
+    val data = self.toArray()
+    val map = new mutable.HashMap[K, V]
+    map.sizeHint(data.length)
+    data.foreach { case (k, v) => map.put(k, v) }
+    map
+  }
+
+  /**
+   * Pass each value in the key-value pair RDD through a map function without changing the keys;
+   * this also retains the original RDD's partitioning.
+   */
+  def mapValues[U](f: V => U): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new MappedValuesRDD(self, cleanF)
+  }
+
+  /**
+   * Pass each value in the key-value pair RDD through a flatMap function without changing the
+   * keys; this also retains the original RDD's partitioning.
+   */
+  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+    val cleanF = self.context.clean(f)
+    new FlatMappedValuesRDD(self, cleanF)
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
+    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+    prfs.mapValues { case Seq(vs, ws) =>
+      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+    }
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+      throw new SparkException("Default partitioner cannot partition array keys.")
+    }
+    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
+    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+    prfs.mapValues { case Seq(vs, w1s, w2s) =>
+      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+    }
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(self, other))
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+  }
+
+  /**
+   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+   * list of values for that key in `this` as well as `other`.
+   */
+  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, new HashPartitioner(numPartitions))
+  }
+
+  /**
+   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+   * tuple with the list of values for that key in `this`, `other1` and `other2`.
+   */
+  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, new HashPartitioner(numPartitions))
+  }
+
+  /** Alias for cogroup. */
+  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+    cogroup(other, defaultPartitioner(self, other))
+  }
+
+  /** Alias for cogroup. */
+  def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+  }
+
+  /**
+   * Return an RDD with the pairs from `this` whose keys are not in `other`.
+   *
+   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+   * RDD will be <= us.
+   */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
+
+  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+    subtractByKey(other, new HashPartitioner(numPartitions))
+
+  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+    new SubtractedRDD[K, V, W](self, other, p)
+
+  /**
+   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
+   * RDD has a known partitioner by only searching the partition that the key maps to.
+   */
+  def lookup(key: K): Seq[V] = {
+    self.partitioner match {
+      case Some(p) =>
+        val index = p.getPartition(key)
+        def process(it: Iterator[(K, V)]): Seq[V] = {
+          val buf = new ArrayBuffer[V]
+          for ((k, v) <- it if k == key) {
+            buf += v
+          }
+          buf
+        }
+        val res = self.context.runJob(self, process _, Array(index), false)
+        res(0)
+      case None =>
+        self.filter(_._1 == key).map(_._2).collect()
+    }
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD.
+   */
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress the result with the
+   * supplied codec.
+   */
+  def saveAsHadoopFile[F <: OutputFormat[K, V]](
+      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
+    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+   */
+  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+    saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+   */
+  def saveAsNewAPIHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+      conf: Configuration = self.context.hadoopConfiguration) {
+    val job = new NewAPIHadoopJob(conf)
+    job.setOutputKeyClass(keyClass)
+    job.setOutputValueClass(valueClass)
+    val wrappedConf = new SerializableWritable(job.getConfiguration)
+    NewFileOutputFormat.setOutputPath(job, new Path(path))
+    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+    val jobtrackerID = formatter.format(new Date())
+    val stageId = self.id
+    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+      /* "reduce task" <split #> <attempt # = spark task #> */
+      val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
+      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+      val format = outputFormatClass.newInstance
+      val committer = format.getOutputCommitter(hadoopContext)
+      committer.setupTask(hadoopContext)
+      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+      while (iter.hasNext) {
+        val (k, v) = iter.next()
+        writer.write(k, v)
+      }
+      writer.close(hadoopContext)
+      committer.commitTask(hadoopContext)
+      return 1
+    }
+    val jobFormat = outputFormatClass.newInstance
+    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
+     * however we're only going to use this local OutputCommitter for
+     * setupJob/commitJob, so we just use a dummy "map" task.
+     */
+    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
+    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+    jobCommitter.setupJob(jobTaskContext)
+    val count = self.context.runJob(self, writeShard _).sum
+    jobCommitter.commitJob(jobTaskContext)
+    jobCommitter.cleanupJob(jobTaskContext)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
+   */
+  def saveAsHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      codec: Class[_ <: CompressionCodec]) {
+    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
+      new JobConf(self.context.hadoopConfiguration), Some(codec))
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+   * supporting the key and value types K and V in this RDD.
+   */
+  def saveAsHadoopFile(
+      path: String,
+      keyClass: Class[_],
+      valueClass: Class[_],
+      outputFormatClass: Class[_ <: OutputFormat[_, _]],
+      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
+      codec: Option[Class[_ <: CompressionCodec]] = None) {
+    conf.setOutputKeyClass(keyClass)
+    conf.setOutputValueClass(valueClass)
+    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+    conf.set("mapred.output.format.class", outputFormatClass.getName)
+    for (c <- codec) {
+      conf.setCompressMapOutput(true)
+      conf.set("mapred.output.compress", "true")
+      conf.setMapOutputCompressorClass(c)
+      conf.set("mapred.output.compression.codec", c.getCanonicalName)
+      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+    }
+    conf.setOutputCommitter(classOf[FileOutputCommitter])
+    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
+    saveAsHadoopDataset(conf)
+  }
+
+  /**
+   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
+   * that storage system. The JobConf should set an OutputFormat and any output paths required
+   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
+   * MapReduce job.
+   */
+  def saveAsHadoopDataset(conf: JobConf) {
+    val outputFormatClass = conf.getOutputFormat
+    val keyClass = conf.getOutputKeyClass
+    val valueClass = conf.getOutputValueClass
+    if (outputFormatClass == null) {
+      throw new SparkException("Output format class not set")
+    }
+    if (keyClass == null) {
+      throw new SparkException("Output key class not set")
+    }
+    if (valueClass == null) {
+      throw new SparkException("Output value class not set")
+    }
+
+    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+
+    val writer = new SparkHadoopWriter(conf)
+    writer.preSetup()
+
+    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+      // around by taking a mod. We expect that no task will be attempted 2 billion times.
+      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+
+      writer.setup(context.stageId, context.splitId, attemptNumber)
+      writer.open()
+
+      var count = 0
+      while(iter.hasNext) {
+        val record = iter.next()
+        count += 1
+        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+      }
+
+      writer.close()
+      writer.commit()
+    }
+
+    self.context.runJob(self, writeToFile _)
+    writer.commitJob()
+    writer.cleanup()
+  }
+
+  /**
+   * Return an RDD with the keys of each tuple.
+   */
+  def keys: RDD[K] = self.map(_._1)
+
+  /**
+   * Return an RDD with the values of each tuple.
+   */
+  def values: RDD[V] = self.map(_._2)
+
+  private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+  private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+private[spark] object Manifests {
+  val seqSeqManifest = classManifest[Seq[Seq[_]]]
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 8db3611..6dbd430 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -23,6 +23,8 @@ import scala.collection.Map
 import org.apache.spark._
 import java.io._
 import scala.Serializable
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
 
 private[spark] class ParallelCollectionPartition[T: ClassManifest](
     var rddId: Long,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index 8e79a5c..165cd41 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{NarrowDependency, RDD, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{NarrowDependency, SparkEnv, Partition, TaskContext}
 
 
 class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 98498d5..d5304ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 
-import org.apache.spark.{RDD, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkEnv, Partition, TaskContext}
 import org.apache.spark.broadcast.Broadcast