You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:47 UTC
[63/69] [abbrv] Move some classes to more appropriate packages:
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
index cb25ff7..5fd1fab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.java
-import org.apache.spark.RDD
+import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.util.StatCounter
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 09da35a..a6518ab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -33,12 +33,12 @@ import org.apache.hadoop.conf.Configuration
import org.apache.spark.HashPartitioner
import org.apache.spark.Partitioner
import org.apache.spark.Partitioner._
-import org.apache.spark.RDD
import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.api.java.function.{Function2 => JFunction2}
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.PartialResult
+import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.OrderedRDDFunctions
import org.apache.spark.storage.StorageLevel
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
index 68cfcf5..eec58ab 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.java
import org.apache.spark._
+import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.storage.StorageLevel
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 1ad8514..7e6e691 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -21,13 +21,15 @@ import java.util.{List => JList, Comparator}
import scala.Tuple2
import scala.collection.JavaConversions._
+import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.spark.{SparkContext, Partition, RDD, TaskContext}
+
+import org.apache.spark.{SparkContext, Partition, TaskContext}
+import org.apache.spark.rdd.RDD
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import org.apache.spark.partial.{PartialResult, BoundedDouble}
import org.apache.spark.storage.StorageLevel
-import com.google.common.base.Optional
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 618a7b3..8869e07 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -26,13 +26,13 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import com.google.common.base.Optional
-import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, RDD, SparkContext}
+import org.apache.spark.{Accumulable, AccumulableParam, Accumulator, AccumulatorParam, SparkContext}
import org.apache.spark.SparkContext.IntAccumulatorParam
import org.apache.spark.SparkContext.DoubleAccumulatorParam
import org.apache.spark.broadcast.Broadcast
-
-import com.google.common.base.Optional
+import org.apache.spark.rdd.RDD
/**
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns [[org.apache.spark.api.java.JavaRDD]]s and
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
index eea63d5..b090c6e 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
@@ -18,8 +18,8 @@
package org.apache.spark.api.python
import org.apache.spark.Partitioner
-import org.apache.spark.Utils
import java.util.Arrays
+import org.apache.spark.util.Utils
/**
* A [[org.apache.spark.Partitioner]] that performs handling of byte arrays, for use by the Python API.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 621f0fe..ccd3833 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -26,7 +26,9 @@ import scala.collection.JavaConversions._
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark._
+import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.PipedRDD
+import org.apache.spark.util.Utils
private[spark] class PythonRDD[T: ClassManifest](
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
index 99e8623..93e7815 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BitTorrentBroadcast.scala
@@ -27,6 +27,7 @@ import scala.math
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 7a52ff0..9db26ae 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -23,10 +23,10 @@ import java.net.URL
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import org.apache.spark.{HttpServer, Logging, SparkEnv, Utils}
+import org.apache.spark.{HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashSet}
+import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashSet}
private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
index 10b910d..21ec946 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/MultiTracker.scala
@@ -24,6 +24,7 @@ import java.util.Random
import scala.collection.mutable.Map
import org.apache.spark._
+import org.apache.spark.util.Utils
private object MultiTracker
extends Logging {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
index b5a4ccc..80c97ca 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TreeBroadcast.scala
@@ -26,6 +26,7 @@ import scala.math
import org.apache.spark._
import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.Utils
private[spark] class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean, id: Long)
extends Broadcast[T](id) with Logging with Serializable {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 4dc6ada..c31619d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -19,10 +19,10 @@ package org.apache.spark.deploy
import scala.collection.immutable.List
-import org.apache.spark.Utils
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
+import org.apache.spark.util.Utils
private[deploy] sealed trait DeployMessage extends Serializable
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index af5a411..78e3747 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -21,8 +21,8 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.deploy.master.Master
-import org.apache.spark.util.AkkaUtils
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.{Logging}
import scala.collection.mutable.ArrayBuffer
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 0322029..d5e9a0e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -17,8 +17,8 @@
package org.apache.spark.deploy.client
-import org.apache.spark.util.AkkaUtils
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.{Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 869b2b2..7cf0a77 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -27,12 +27,12 @@ import akka.actor.Terminated
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
import akka.util.duration._
-import org.apache.spark.{Logging, SparkException, Utils}
+import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index c86cca2..9d89b45 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -17,8 +17,7 @@
package org.apache.spark.deploy.master
-import org.apache.spark.util.IntParam
-import org.apache.spark.Utils
+import org.apache.spark.util.{Utils, IntParam}
/**
* Command-line parser for the master.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 285e07a..6219f11 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.master
import akka.actor.ActorRef
import scala.collection.mutable
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
private[spark] class WorkerInfo(
val id: String,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 6435c7f..f4e574d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -31,7 +31,7 @@ import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMaste
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 58d3863..d7a5722 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -27,12 +27,12 @@ import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.Utils
import org.apache.spark.deploy.DeployWebUI
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 47b1e52..f4df729 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -23,10 +23,11 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.{Logging}
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
/**
* Web UI server for the standalone master.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 01ce4a6..e3dc30e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -25,9 +25,10 @@ import akka.actor.ActorRef
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
import org.apache.spark.deploy.{ExecutorState, ApplicationDescription}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
/**
* Manages the execution of one executor process.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 86e8e75..09530be 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,13 +27,13 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.{Logging}
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class Worker(
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 6d91223..0ae89a8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -17,9 +17,7 @@
package org.apache.spark.deploy.worker
-import org.apache.spark.util.IntParam
-import org.apache.spark.util.MemoryParam
-import org.apache.spark.Utils
+import org.apache.spark.util.{Utils, IntParam, MemoryParam}
import java.lang.management.ManagementFactory
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index 6192c23..d2d3617 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -27,11 +27,11 @@ import akka.util.duration._
import net.liftweb.json.JsonAST.JValue
-import org.apache.spark.Utils
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
private[spark] class IndexPage(parent: WorkerWebUI) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index bb8165a..95d6007 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -26,10 +26,11 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import org.apache.spark.deploy.worker.Worker
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
import org.apache.spark.ui.JettyUtils
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
/**
* Web UI server for the standalone worker.
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 5446a3f..d365804 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
import org.apache.spark.scheduler._
import org.apache.spark._
+import org.apache.spark.util.Utils
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
index 410a94d..da62091 100644
--- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala
@@ -22,8 +22,9 @@ import org.apache.mesos.{Executor => MesosExecutor, MesosExecutorDriver, MesosNa
import org.apache.mesos.Protos.{TaskState => MesosTaskState, TaskStatus => MesosTaskStatus, _}
import org.apache.spark.TaskState.TaskState
import com.google.protobuf.ByteString
-import org.apache.spark.{Utils, Logging}
+import org.apache.spark.{Logging}
import org.apache.spark.TaskState
+import org.apache.spark.util.Utils
private[spark] class MesosExecutorBackend
extends MesosExecutor
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
index 65801f7..7839023 100644
--- a/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/StandaloneExecutorBackend.scala
@@ -22,10 +22,10 @@ import java.nio.ByteBuffer
import akka.actor.{ActorRef, Actor, Props, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import org.apache.spark.{Logging, Utils, SparkEnv}
+import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
private[spark] class StandaloneExecutorBackend(
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 9e2233c..e15a839 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -34,6 +34,7 @@ import scala.collection.mutable.ArrayBuffer
import akka.dispatch.{Await, Promise, ExecutionContext, Future}
import akka.util.Duration
import akka.util.duration._
+import org.apache.spark.util.Utils
private[spark] class ConnectionManager(port: Int) extends Logging {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
index 0839c01..50dd9bc 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManagerId.scala
@@ -19,7 +19,7 @@ package org.apache.spark.network
import java.net.InetSocketAddress
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
private[spark] case class ConnectionManagerId(host: String, port: Int) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/package.scala b/core/src/main/scala/org/apache/spark/package.scala
index 1126480..c0ec527 100644
--- a/core/src/main/scala/org/apache/spark/package.scala
+++ b/core/src/main/scala/org/apache/spark/package.scala
@@ -1,3 +1,5 @@
+import org.apache.spark.rdd.{SequenceFileRDDFunctions, DoubleRDDFunctions, PairRDDFunctions}
+
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -16,16 +18,17 @@
*/
/**
- * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to Spark, while
- * [[org.apache.spark.RDD]] is the data type representing a distributed collection, and provides most
- * parallel operations.
+ * Core Spark functionality. [[org.apache.spark.SparkContext]] serves as the main entry point to
+ * Spark, while [[org.apache.spark.rdd.RDD]] is the data type representing a distributed collection,
+ * and provides most parallel operations.
*
- * In addition, [[org.apache.spark.PairRDDFunctions]] contains operations available only on RDDs of key-value
- * pairs, such as `groupByKey` and `join`; [[org.apache.spark.DoubleRDDFunctions]] contains operations
- * available only on RDDs of Doubles; and [[org.apache.spark.SequenceFileRDDFunctions]] contains operations
- * available on RDDs that can be saved as SequenceFiles. These operations are automatically
- * available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit conversions when
- * you `import org.apache.spark.SparkContext._`.
+ * In addition, [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs
+ * of key-value pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]]
+ * contains operations available only on RDDs of Doubles; and
+ * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that can
+ * be saved as SequenceFiles. These operations are automatically available on any RDD of the right
+ * type (e.g. RDD[(Int, Int)] through implicit conversions when you
+ * `import org.apache.spark.SparkContext._`.
*/
package object spark {
// For package docs only
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
index c5d51be..d710694 100644
--- a/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
+++ b/core/src/main/scala/org/apache/spark/partial/ApproximateActionListener.scala
@@ -19,6 +19,7 @@ package org.apache.spark.partial
import org.apache.spark._
import org.apache.spark.scheduler.JobListener
+import org.apache.spark.rdd.RDD
/**
* A JobListener for an approximate single-result action, such as count() or non-parallel reduce().
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
index 4bb01ef..bca6956 100644
--- a/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.storage.BlockManager
private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index dcc35e8..0187256 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -23,7 +23,7 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{Partition, Partitioner, RDD, SparkEnv, TaskContext}
+import org.apache.spark.{Partition, Partitioner, SparkEnv, TaskContext}
import org.apache.spark.{Dependency, OneToOneDependency, ShuffleDependency}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
new file mode 100644
index 0000000..a4bec41
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.spark.partial.BoundedDouble
+import org.apache.spark.partial.MeanEvaluator
+import org.apache.spark.partial.PartialResult
+import org.apache.spark.partial.SumEvaluator
+import org.apache.spark.util.StatCounter
+import org.apache.spark.{TaskContext, Logging}
+
+/**
+ * Extra functions available on RDDs of Doubles through an implicit conversion.
+ * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
+ */
+class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
+ /** Add up the elements in this RDD. */
+ def sum(): Double = {
+ self.reduce(_ + _)
+ }
+
+ /**
+ * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
+ * of the RDD's elements in one operation.
+ */
+ def stats(): StatCounter = {
+ self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
+ }
+
+ /** Compute the mean of this RDD's elements. */
+ def mean(): Double = stats().mean
+
+ /** Compute the variance of this RDD's elements. */
+ def variance(): Double = stats().variance
+
+ /** Compute the standard deviation of this RDD's elements. */
+ def stdev(): Double = stats().stdev
+
+ /**
+ * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
+ * estimating the standard deviation by dividing by N-1 instead of N).
+ */
+ def sampleStdev(): Double = stats().sampleStdev
+
+ /**
+ * Compute the sample variance of this RDD's elements (which corrects for bias in
+ * estimating the variance by dividing by N-1 instead of N).
+ */
+ def sampleVariance(): Double = stats().sampleVariance
+
+ /** (Experimental) Approximate operation to return the mean within a timeout. */
+ def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
+ val evaluator = new MeanEvaluator(self.partitions.size, confidence)
+ self.context.runApproximateJob(self, processPartition, evaluator, timeout)
+ }
+
+ /** (Experimental) Approximate operation to return the sum within a timeout. */
+ def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+ val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
+ val evaluator = new SumEvaluator(self.partitions.size, confidence)
+ self.context.runApproximateJob(self, processPartition, evaluator, timeout)
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
index 24ce4ab..c8900d1 100644
--- a/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
index 4df8ceb..5312dc0 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{OneToOneDependency, RDD, Partition, TaskContext}
+import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
private[spark] class FilteredRDD[T: ClassManifest](
prev: RDD[T],
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
index 2bf7653..cbdf6d8 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
private[spark]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
index e544720..82000ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/FlatMappedValuesRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{TaskContext, Partition, RDD}
+import org.apache.spark.{TaskContext, Partition}
private[spark]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
index 2ce9419..829545d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
extends RDD[Array[T]](prev) {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
index 08e6154..2cb6734 100644
--- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
@@ -18,21 +18,15 @@
package org.apache.spark.rdd
import java.io.EOFException
-import java.util.NoSuchElementException
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.FileInputFormat
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.InputSplit
import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapred.RecordReader
import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
-import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, SparkEnv, TaskContext}
+import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, SparkEnv, TaskContext}
import org.apache.spark.util.NextIterator
import org.apache.hadoop.conf.{Configuration, Configurable}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
index 3db460b..aca0146 100644
--- a/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
@@ -19,7 +19,7 @@ package org.apache.spark.rdd
import java.sql.{Connection, ResultSet}
-import org.apache.spark.{Logging, Partition, RDD, SparkContext, TaskContext}
+import org.apache.spark.{Logging, Partition, SparkContext, TaskContext}
import org.apache.spark.util.NextIterator
private[spark] class JdbcPartition(idx: Int, val lower: Long, val upper: Long) extends Partition {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
index 13009d3..203179c 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
private[spark]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
index 1683050..3ed8339 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MapPartitionsWithIndexRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
/**
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
index 26d4806..e8be1c4 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
index a405e9a..d33c1af 100644
--- a/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/MappedValuesRDD.scala
@@ -18,7 +18,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{TaskContext, Partition, RDD}
+import org.apache.spark.{TaskContext, Partition}
private[spark]
class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
index 114b504..7b3a89f 100644
--- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
-import org.apache.spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.{Dependency, Logging, Partition, SerializableWritable, SparkContext, TaskContext}
private[spark]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
index 4c3df0e..697be8b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/OrderedRDDFunctions.scala
@@ -17,12 +17,13 @@
package org.apache.spark.rdd
-import org.apache.spark.{RangePartitioner, Logging, RDD}
+import org.apache.spark.{RangePartitioner, Logging}
/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ * an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
+ * use these functions. They will work with any key type that has a `scala.math.Ordered`
+ * implementation.
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
V: ClassManifest,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
new file mode 100644
index 0000000..a47c512
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -0,0 +1,702 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.nio.ByteBuffer
+import java.util.Date
+import java.text.SimpleDateFormat
+import java.util.{HashMap => JHashMap}
+
+import scala.collection.{mutable, Map}
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.SequenceFile.CompressionType
+import org.apache.hadoop.mapred.FileOutputFormat
+import org.apache.hadoop.mapred.OutputFormat
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
+import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob}
+import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter}
+
+import org.apache.spark._
+import org.apache.spark.SparkContext._
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.Aggregator
+import org.apache.spark.Partitioner
+import org.apache.spark.Partitioner.defaultPartitioner
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
+ */
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
+ extends Logging
+ with SparkHadoopMapReduceUtil
+ with Serializable {
+
+ /**
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
+ * Note that V and C can be different -- for example, one might group an RDD of type
+ * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
+ *
+ * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+ * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+ * - `mergeCombiners`, to combine two C's into a single one.
+ *
+ * In addition, users can control the partitioning of the output RDD, and whether to perform
+ * map-side aggregation (if a mapper can produce multiple items with the same key).
+ */
+ def combineByKey[C](createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ partitioner: Partitioner,
+ mapSideCombine: Boolean = true,
+ serializerClass: String = null): RDD[(K, C)] = {
+ if (getKeyClass().isArray) {
+ if (mapSideCombine) {
+ throw new SparkException("Cannot use map-side combining with array keys.")
+ }
+ if (partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ }
+ val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
+ if (self.partitioner == Some(partitioner)) {
+ self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ } else if (mapSideCombine) {
+ val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
+ .setSerializer(serializerClass)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
+ } else {
+ // Don't apply map-side combiner.
+ // A sanity check to make sure mergeCombiners is not defined.
+ assert(mergeCombiners == null)
+ val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+ values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ }
+ }
+
+ /**
+ * Simplified version of combineByKey that hash-partitions the output RDD.
+ */
+ def combineByKey[C](createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C,
+ numPartitions: Int): RDD[(K, C)] = {
+ combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
+ // Serialize the zero value to a byte array so that we can get a new clone of it on each key
+ val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
+ val zeroArray = new Array[Byte](zeroBuffer.limit)
+ zeroBuffer.get(zeroArray)
+
+ // When deserializing, use a lazy val to create just one instance of the serializer per task
+ lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
+ def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
+
+ combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
+ }
+
+ /**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
+ foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
+ }
+
+ /**
+ * Merge the values for each key using an associative function and a neutral "zero value" which may
+ * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
+ * list concatenation, 0 for addition, or 1 for multiplication.).
+ */
+ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
+ foldByKey(zeroValue, defaultPartitioner(self))(func)
+ }
+
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce.
+ */
+ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
+ combineByKey[V]((v: V) => v, func, func, partitioner)
+ }
+
+ /**
+ * Merge the values for each key using an associative reduce function, but return the results
+ * immediately to the master as a Map. This will also perform the merging locally on each mapper
+ * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+ */
+ def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
+
+ if (getKeyClass().isArray) {
+ throw new SparkException("reduceByKeyLocally() does not support array keys")
+ }
+
+ def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ val map = new JHashMap[K, V]
+ iter.foreach { case (k, v) =>
+ val old = map.get(k)
+ map.put(k, if (old == null) v else func(old, v))
+ }
+ Iterator(map)
+ }
+
+ def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
+ m2.foreach { case (k, v) =>
+ val old = m1.get(k)
+ m1.put(k, if (old == null) v else func(old, v))
+ }
+ m1
+ }
+
+ self.mapPartitions(reducePartition).reduce(mergeMaps)
+ }
+
+ /** Alias for reduceByKeyLocally */
+ def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
+
+ /** Count the number of elements for each key, and return the result to the master as a Map. */
+ def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
+
+ /**
+ * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * not finish within a timeout.
+ */
+ def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
+ : PartialResult[Map[K, BoundedDouble]] = {
+ self.map(_._1).countByValueApprox(timeout, confidence)
+ }
+
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
+ */
+ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
+ reduceByKey(new HashPartitioner(numPartitions), func)
+ }
+
+ /**
+ * Group the values for each key in the RDD into a single sequence. Allows controlling the
+ * partitioning of the resulting key-value pair RDD by passing a Partitioner.
+ */
+ def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ // groupByKey shouldn't use map side combine because map side combine does not
+ // reduce the amount of data shuffled and requires all map side data be inserted
+ // into a hash table, leading to more objects in the old gen.
+ def createCombiner(v: V) = ArrayBuffer(v)
+ def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
+ val bufs = combineByKey[ArrayBuffer[V]](
+ createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
+ bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ }
+
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with into `numPartitions` partitions.
+ */
+ def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+ groupByKey(new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Return a copy of the RDD partitioned using the specified partitioner.
+ */
+ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
+ if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ new ShuffledRDD[K, V, (K, V)](self, partitioner)
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
+ */
+ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ }
+ }
+
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
+ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (ws.isEmpty) {
+ vs.iterator.map(v => (v, None))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+ }
+ }
+ }
+
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
+ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Option[V], W))] = {
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (vs.isEmpty) {
+ ws.iterator.map(w => (None, w))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+ }
+ }
+ }
+
+ /**
+ * Simplified version of combineByKey that hash-partitions the resulting RDD using the
+ * existing partitioner/parallelism level.
+ */
+ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+ : RDD[(K, C)] = {
+ combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
+ }
+
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
+ * parallelism level.
+ */
+ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
+ reduceByKey(defaultPartitioner(self), func)
+ }
+
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with the existing partitioner/parallelism level.
+ */
+ def groupByKey(): RDD[(K, Seq[V])] = {
+ groupByKey(defaultPartitioner(self))
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
+ def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
+ join(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
+ def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
+ join(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * using the existing partitioner/parallelism level.
+ */
+ def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
+ leftOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * into `numPartitions` partitions.
+ */
+ def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
+ leftOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD using the existing partitioner/parallelism level.
+ */
+ def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
+ rightOuterJoin(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD into the given number of partitions.
+ */
+ def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
+ rightOuterJoin(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * Return the key-value pairs in this RDD to the master as a Map.
+ */
+ def collectAsMap(): Map[K, V] = {
+ val data = self.toArray()
+ val map = new mutable.HashMap[K, V]
+ map.sizeHint(data.length)
+ data.foreach { case (k, v) => map.put(k, v) }
+ map
+ }
+
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
+ def mapValues[U](f: V => U): RDD[(K, U)] = {
+ val cleanF = self.context.clean(f)
+ new MappedValuesRDD(self, cleanF)
+ }
+
+ /**
+ * Pass each value in the key-value pair RDD through a flatMap function without changing the
+ * keys; this also retains the original RDD's partitioning.
+ */
+ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
+ val cleanF = self.context.clean(f)
+ new FlatMappedValuesRDD(self, cleanF)
+ }
+
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
+ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+ if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ prfs.mapValues { case Seq(vs, ws) =>
+ (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ }
+ }
+
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
+ throw new SparkException("Default partitioner cannot partition array keys.")
+ }
+ val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
+ val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
+ prfs.mapValues { case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ }
+ }
+
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
+ def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, new HashPartitioner(numPartitions))
+ }
+
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
+ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, new HashPartitioner(numPartitions))
+ }
+
+ /** Alias for cogroup. */
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ cogroup(other, defaultPartitioner(self, other))
+ }
+
+ /** Alias for cogroup. */
+ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
+ : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ cogroup(other1, other2, defaultPartitioner(self, other1, other2))
+ }
+
+ /**
+ * Return an RDD with the pairs from `this` whose keys are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
+ subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
+
+ /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
+ subtractByKey(other, new HashPartitioner(numPartitions))
+
+ /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
+ def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
+ new SubtractedRDD[K, V, W](self, other, p)
+
+ /**
+ * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
+ * RDD has a known partitioner by only searching the partition that the key maps to.
+ */
+ def lookup(key: K): Seq[V] = {
+ self.partitioner match {
+ case Some(p) =>
+ val index = p.getPartition(key)
+ def process(it: Iterator[(K, V)]): Seq[V] = {
+ val buf = new ArrayBuffer[V]
+ for ((k, v) <- it if k == key) {
+ buf += v
+ }
+ buf
+ }
+ val res = self.context.runJob(self, process _, Array(index), false)
+ res(0)
+ case None =>
+ self.filter(_._1 == key).map(_._2).collect()
+ }
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD.
+ */
+ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD. Compress the result with the
+ * supplied codec.
+ */
+ def saveAsHadoopFile[F <: OutputFormat[K, V]](
+ path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
+ saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+ * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+ */
+ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
+ saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+ * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+ */
+ def saveAsNewAPIHadoopFile(
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = self.context.hadoopConfiguration) {
+ val job = new NewAPIHadoopJob(conf)
+ job.setOutputKeyClass(keyClass)
+ job.setOutputValueClass(valueClass)
+ val wrappedConf = new SerializableWritable(job.getConfiguration)
+ NewFileOutputFormat.setOutputPath(job, new Path(path))
+ val formatter = new SimpleDateFormat("yyyyMMddHHmm")
+ val jobtrackerID = formatter.format(new Date())
+ val stageId = self.id
+ def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
+ val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+ /* "reduce task" <split #> <attempt # = spark task #> */
+ val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
+ val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
+ val format = outputFormatClass.newInstance
+ val committer = format.getOutputCommitter(hadoopContext)
+ committer.setupTask(hadoopContext)
+ val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
+ while (iter.hasNext) {
+ val (k, v) = iter.next()
+ writer.write(k, v)
+ }
+ writer.close(hadoopContext)
+ committer.commitTask(hadoopContext)
+ return 1
+ }
+ val jobFormat = outputFormatClass.newInstance
+ /* apparently we need a TaskAttemptID to construct an OutputCommitter;
+ * however we're only going to use this local OutputCommitter for
+ * setupJob/commitJob, so we just use a dummy "map" task.
+ */
+ val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
+ val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
+ val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
+ jobCommitter.setupJob(jobTaskContext)
+ val count = self.context.runJob(self, writeShard _).sum
+ jobCommitter.commitJob(jobTaskContext)
+ jobCommitter.cleanupJob(jobTaskContext)
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
+ */
+ def saveAsHadoopFile(
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ codec: Class[_ <: CompressionCodec]) {
+ saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
+ new JobConf(self.context.hadoopConfiguration), Some(codec))
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD.
+ */
+ def saveAsHadoopFile(
+ path: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf = new JobConf(self.context.hadoopConfiguration),
+ codec: Option[Class[_ <: CompressionCodec]] = None) {
+ conf.setOutputKeyClass(keyClass)
+ conf.setOutputValueClass(valueClass)
+ // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
+ conf.set("mapred.output.format.class", outputFormatClass.getName)
+ for (c <- codec) {
+ conf.setCompressMapOutput(true)
+ conf.set("mapred.output.compress", "true")
+ conf.setMapOutputCompressorClass(c)
+ conf.set("mapred.output.compression.codec", c.getCanonicalName)
+ conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
+ }
+ conf.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
+ saveAsHadoopDataset(conf)
+ }
+
+ /**
+ * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
+ * that storage system. The JobConf should set an OutputFormat and any output paths required
+ * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
+ * MapReduce job.
+ */
+ def saveAsHadoopDataset(conf: JobConf) {
+ val outputFormatClass = conf.getOutputFormat
+ val keyClass = conf.getOutputKeyClass
+ val valueClass = conf.getOutputValueClass
+ if (outputFormatClass == null) {
+ throw new SparkException("Output format class not set")
+ }
+ if (keyClass == null) {
+ throw new SparkException("Output key class not set")
+ }
+ if (valueClass == null) {
+ throw new SparkException("Output value class not set")
+ }
+
+ logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
+
+ val writer = new SparkHadoopWriter(conf)
+ writer.preSetup()
+
+ def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
+ val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+
+ writer.setup(context.stageId, context.splitId, attemptNumber)
+ writer.open()
+
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next()
+ count += 1
+ writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
+ }
+
+ writer.close()
+ writer.commit()
+ }
+
+ self.context.runJob(self, writeToFile _)
+ writer.commitJob()
+ writer.cleanup()
+ }
+
+ /**
+ * Return an RDD with the keys of each tuple.
+ */
+ def keys: RDD[K] = self.map(_._1)
+
+ /**
+ * Return an RDD with the values of each tuple.
+ */
+ def values: RDD[V] = self.map(_._2)
+
+ private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
+
+ private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
+}
+
+private[spark] object Manifests {
+ val seqSeqManifest = classManifest[Seq[Seq[_]]]
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
index 8db3611..6dbd430 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala
@@ -23,6 +23,8 @@ import scala.collection.Map
import org.apache.spark._
import java.io._
import scala.Serializable
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.util.Utils
private[spark] class ParallelCollectionPartition[T: ClassManifest](
var rddId: Long,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
index 8e79a5c..165cd41 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PartitionPruningRDD.scala
@@ -17,7 +17,7 @@
package org.apache.spark.rdd
-import org.apache.spark.{NarrowDependency, RDD, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{NarrowDependency, SparkEnv, Partition, TaskContext}
class PartitionPruningRDDPartition(idx: Int, val parentSplit: Partition) extends Partition {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
index 98498d5..d5304ab 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import org.apache.spark.{RDD, SparkEnv, Partition, TaskContext}
+import org.apache.spark.{SparkEnv, Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast