You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:49 UTC

[65/69] [abbrv] git commit: Move some classes to more appropriate packages:

Move some classes to more appropriate packages:

* RDD, *RDDFunctions -> org.apache.spark.rdd
* Utils, ClosureCleaner, SizeEstimator -> org.apache.spark.util
* JavaSerializer, KryoSerializer -> org.apache.spark.serializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0a8cc309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0a8cc309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0a8cc309

Branch: refs/heads/branch-0.8
Commit: 0a8cc309211c62f8824d76618705c817edcf2424
Parents: 5b4dea2
Author: Matei Zaharia <ma...@eecs.berkeley.edu>
Authored: Sun Sep 1 00:32:28 2013 -0700
Committer: Matei Zaharia <ma...@eecs.berkeley.edu>
Committed: Sun Sep 1 14:13:16 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/bagel/Bagel.scala    |   2 +-
 .../scala/org/apache/spark/Accumulators.scala   |   1 +
 .../scala/org/apache/spark/CacheManager.scala   |   1 +
 .../scala/org/apache/spark/ClosureCleaner.scala | 231 -----
 .../scala/org/apache/spark/Dependency.scala     |   2 +
 .../org/apache/spark/DoubleRDDFunctions.scala   |  78 --
 .../scala/org/apache/spark/HttpFileServer.scala |   1 +
 .../scala/org/apache/spark/HttpServer.scala     |   1 +
 .../scala/org/apache/spark/JavaSerializer.scala |  83 --
 .../scala/org/apache/spark/KryoSerializer.scala | 156 ---
 .../org/apache/spark/MapOutputTracker.scala     |   2 +-
 .../org/apache/spark/PairRDDFunctions.scala     | 703 --------------
 .../scala/org/apache/spark/Partitioner.scala    |   3 +
 core/src/main/scala/org/apache/spark/RDD.scala  | 957 -------------------
 .../org/apache/spark/RDDCheckpointData.scala    | 130 ---
 .../apache/spark/SequenceFileRDDFunctions.scala | 107 ---
 .../scala/org/apache/spark/SizeEstimator.scala  | 283 ------
 .../scala/org/apache/spark/SparkContext.scala   |   8 +-
 .../main/scala/org/apache/spark/SparkEnv.scala  |   6 +-
 .../src/main/scala/org/apache/spark/Utils.scala | 780 ---------------
 .../apache/spark/api/java/JavaDoubleRDD.scala   |   2 +-
 .../org/apache/spark/api/java/JavaPairRDD.scala |   2 +-
 .../org/apache/spark/api/java/JavaRDD.scala     |   1 +
 .../org/apache/spark/api/java/JavaRDDLike.scala |   6 +-
 .../spark/api/java/JavaSparkContext.scala       |   6 +-
 .../spark/api/python/PythonPartitioner.scala    |   2 +-
 .../org/apache/spark/api/python/PythonRDD.scala |   2 +
 .../spark/broadcast/BitTorrentBroadcast.scala   |   1 +
 .../apache/spark/broadcast/HttpBroadcast.scala  |   4 +-
 .../apache/spark/broadcast/MultiTracker.scala   |   1 +
 .../apache/spark/broadcast/TreeBroadcast.scala  |   1 +
 .../org/apache/spark/deploy/DeployMessage.scala |   2 +-
 .../apache/spark/deploy/LocalSparkCluster.scala |   4 +-
 .../apache/spark/deploy/client/TestClient.scala |   4 +-
 .../org/apache/spark/deploy/master/Master.scala |   4 +-
 .../spark/deploy/master/MasterArguments.scala   |   3 +-
 .../apache/spark/deploy/master/WorkerInfo.scala |   2 +-
 .../deploy/master/ui/ApplicationPage.scala      |   2 +-
 .../spark/deploy/master/ui/IndexPage.scala      |   2 +-
 .../spark/deploy/master/ui/MasterWebUI.scala    |   3 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |   3 +-
 .../org/apache/spark/deploy/worker/Worker.scala |   4 +-
 .../spark/deploy/worker/WorkerArguments.scala   |   4 +-
 .../spark/deploy/worker/ui/IndexPage.scala      |   2 +-
 .../spark/deploy/worker/ui/WorkerWebUI.scala    |   3 +-
 .../org/apache/spark/executor/Executor.scala    |   1 +
 .../spark/executor/MesosExecutorBackend.scala   |   3 +-
 .../executor/StandaloneExecutorBackend.scala    |   4 +-
 .../spark/network/ConnectionManager.scala       |   1 +
 .../spark/network/ConnectionManagerId.scala     |   2 +-
 .../main/scala/org/apache/spark/package.scala   |  21 +-
 .../partial/ApproximateActionListener.scala     |   1 +
 .../scala/org/apache/spark/rdd/BlockRDD.scala   |   2 +-
 .../org/apache/spark/rdd/CoGroupedRDD.scala     |   2 +-
 .../apache/spark/rdd/DoubleRDDFunctions.scala   |  79 ++
 .../scala/org/apache/spark/rdd/EmptyRDD.scala   |   2 +-
 .../org/apache/spark/rdd/FilteredRDD.scala      |   2 +-
 .../org/apache/spark/rdd/FlatMappedRDD.scala    |   2 +-
 .../apache/spark/rdd/FlatMappedValuesRDD.scala  |   2 +-
 .../scala/org/apache/spark/rdd/GlommedRDD.scala |   2 +-
 .../scala/org/apache/spark/rdd/HadoopRDD.scala  |   8 +-
 .../scala/org/apache/spark/rdd/JdbcRDD.scala    |   2 +-
 .../org/apache/spark/rdd/MapPartitionsRDD.scala |   2 +-
 .../spark/rdd/MapPartitionsWithIndexRDD.scala   |   2 +-
 .../scala/org/apache/spark/rdd/MappedRDD.scala  |   2 +-
 .../org/apache/spark/rdd/MappedValuesRDD.scala  |   2 +-
 .../org/apache/spark/rdd/NewHadoopRDD.scala     |   2 +-
 .../apache/spark/rdd/OrderedRDDFunctions.scala  |   7 +-
 .../org/apache/spark/rdd/PairRDDFunctions.scala | 702 ++++++++++++++
 .../spark/rdd/ParallelCollectionRDD.scala       |   2 +
 .../apache/spark/rdd/PartitionPruningRDD.scala  |   2 +-
 .../scala/org/apache/spark/rdd/PipedRDD.scala   |   2 +-
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 942 ++++++++++++++++++
 .../apache/spark/rdd/RDDCheckpointData.scala    | 131 +++
 .../scala/org/apache/spark/rdd/SampledRDD.scala |   2 +-
 .../spark/rdd/SequenceFileRDDFunctions.scala    |  89 ++
 .../org/apache/spark/rdd/ShuffledRDD.scala      |   2 +-
 .../org/apache/spark/rdd/SubtractedRDD.scala    |   1 -
 .../scala/org/apache/spark/rdd/UnionRDD.scala   |   2 +-
 .../apache/spark/rdd/ZippedPartitionsRDD.scala  |   2 +-
 .../scala/org/apache/spark/rdd/ZippedRDD.scala  |   2 +-
 .../apache/spark/scheduler/DAGScheduler.scala   |   1 +
 .../spark/scheduler/DAGSchedulerEvent.scala     |   1 +
 .../org/apache/spark/scheduler/JobLogger.scala  |   1 +
 .../org/apache/spark/scheduler/ResultTask.scala |   7 +-
 .../apache/spark/scheduler/ShuffleMapTask.scala |   2 +
 .../apache/spark/scheduler/SparkListener.scala  |   4 +-
 .../org/apache/spark/scheduler/Stage.scala      |   3 +-
 .../org/apache/spark/scheduler/TaskResult.scala |   3 +-
 .../cluster/ClusterTaskSetManager.scala         |   2 +-
 .../scheduler/cluster/SchedulerBackend.scala    |   2 +-
 .../cluster/SparkDeploySchedulerBackend.scala   |   3 +-
 .../cluster/StandaloneClusterMessage.scala      |   3 +-
 .../cluster/StandaloneSchedulerBackend.scala    |   3 +-
 .../spark/scheduler/cluster/TaskInfo.scala      |   2 +-
 .../spark/scheduler/local/LocalScheduler.scala  |   1 +
 .../mesos/CoarseMesosSchedulerBackend.scala     |   2 +-
 .../scheduler/mesos/MesosSchedulerBackend.scala |   3 +-
 .../spark/serializer/JavaSerializer.scala       |  82 ++
 .../spark/serializer/KryoSerializer.scala       | 159 +++
 .../spark/storage/BlockFetcherIterator.scala    |   2 +-
 .../org/apache/spark/storage/BlockManager.scala |   4 +-
 .../apache/spark/storage/BlockManagerId.scala   |   2 +-
 .../spark/storage/BlockManagerMasterActor.scala |   3 +-
 .../spark/storage/BlockManagerWorker.scala      |   3 +-
 .../org/apache/spark/storage/DiskStore.scala    |   2 +-
 .../org/apache/spark/storage/MemoryStore.scala  |   2 +-
 .../org/apache/spark/storage/StorageUtils.scala |   3 +-
 .../apache/spark/storage/ThreadingTest.scala    |   2 +-
 .../scala/org/apache/spark/ui/SparkUI.scala     |   3 +-
 .../org/apache/spark/ui/exec/ExecutorsUI.scala  |   3 +-
 .../spark/ui/jobs/JobProgressListener.scala     |   2 +-
 .../apache/spark/ui/jobs/JobProgressUI.scala    |   3 +-
 .../org/apache/spark/ui/jobs/StagePage.scala    |   4 +-
 .../org/apache/spark/ui/jobs/StageTable.scala   |   2 +-
 .../org/apache/spark/ui/storage/IndexPage.scala |   2 +-
 .../org/apache/spark/ui/storage/RDDPage.scala   |   2 +-
 .../org/apache/spark/util/ClosureCleaner.scala  | 232 +++++
 .../org/apache/spark/util/MemoryParam.scala     |   2 -
 .../org/apache/spark/util/SizeEstimator.scala   | 284 ++++++
 .../scala/org/apache/spark/util/Utils.scala     | 781 +++++++++++++++
 .../org/apache/spark/CheckpointSuite.scala      |   1 +
 .../org/apache/spark/ClosureCleanerSuite.scala  | 146 ---
 .../scala/org/apache/spark/DriverSuite.scala    |   1 +
 .../scala/org/apache/spark/FailureSuite.scala   |   1 +
 .../org/apache/spark/KryoSerializerSuite.scala  | 208 ----
 .../apache/spark/PairRDDFunctionsSuite.scala    | 299 ------
 .../apache/spark/PartitionPruningRDDSuite.scala |   2 +-
 .../org/apache/spark/PartitioningSuite.scala    |   9 +-
 .../test/scala/org/apache/spark/RDDSuite.scala  | 389 --------
 .../scala/org/apache/spark/ShuffleSuite.scala   |   3 +-
 .../org/apache/spark/SizeEstimatorSuite.scala   | 164 ----
 .../scala/org/apache/spark/SortingSuite.scala   | 123 ---
 .../scala/org/apache/spark/UtilsSuite.scala     | 139 ---
 .../spark/rdd/PairRDDFunctionsSuite.scala       | 300 ++++++
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 391 ++++++++
 .../org/apache/spark/rdd/SortingSuite.scala     | 125 +++
 .../spark/scheduler/DAGSchedulerSuite.scala     |   2 +-
 .../apache/spark/scheduler/JobLoggerSuite.scala |   6 +-
 .../spark/scheduler/TaskContextSuite.scala      |   2 +-
 .../cluster/ClusterTaskSetManagerSuite.scala    |   2 +-
 .../spark/serializer/KryoSerializerSuite.scala  | 208 ++++
 .../spark/storage/BlockManagerSuite.scala       |   8 +-
 .../apache/spark/util/ClosureCleanerSuite.scala | 147 +++
 .../apache/spark/util/SizeEstimatorSuite.scala  | 164 ++++
 .../org/apache/spark/util/UtilsSuite.scala      | 139 +++
 docs/configuration.md                           |  10 +-
 docs/quick-start.md                             |   2 +-
 docs/scala-programming-guide.md                 |   2 +-
 docs/tuning.md                                  |  10 +-
 .../spark/examples/bagel/PageRankUtils.scala    |   1 +
 .../examples/bagel/WikipediaPageRank.scala      |   2 +-
 .../bagel/WikipediaPageRankStandalone.scala     |  17 +-
 .../spark/streaming/examples/QueueStream.scala  |   2 +-
 .../streaming/examples/RawNetworkGrep.scala     |   2 +-
 .../classification/ClassificationModel.scala    |   2 +-
 .../classification/LogisticRegression.scala     |   3 +-
 .../apache/spark/mllib/classification/SVM.scala |   3 +-
 .../apache/spark/mllib/clustering/KMeans.scala  |   3 +-
 .../spark/mllib/clustering/KMeansModel.scala    |   2 +-
 .../mllib/optimization/GradientDescent.scala    |   8 +-
 .../spark/mllib/optimization/Optimizer.scala    |   2 +-
 .../apache/spark/mllib/recommendation/ALS.scala |   7 +-
 .../MatrixFactorizationModel.scala              |   2 +-
 .../regression/GeneralizedLinearAlgorithm.scala |   3 +-
 .../apache/spark/mllib/regression/Lasso.scala   |   3 +-
 .../mllib/regression/LinearRegression.scala     |   3 +-
 .../mllib/regression/RegressionModel.scala      |   2 +-
 .../mllib/regression/RidgeRegression.scala      |   3 +-
 .../spark/mllib/util/DataValidators.scala       |   3 +-
 .../spark/mllib/util/KMeansDataGenerator.scala  |   3 +-
 .../spark/mllib/util/LinearDataGenerator.scala  |   3 +-
 .../util/LogisticRegressionDataGenerator.scala  |   3 +-
 .../spark/mllib/util/MFDataGenerator.scala      |   3 +-
 .../org/apache/spark/mllib/util/MLUtils.scala   |   3 +-
 .../spark/mllib/util/SVMDataGenerator.scala     |   5 +-
 python/pyspark/context.py                       |   4 +-
 .../org/apache/spark/repl/SparkIMain.scala      |   2 +-
 .../org/apache/spark/streaming/DStream.scala    |   3 +-
 .../org/apache/spark/streaming/Duration.scala   |   2 +-
 .../spark/streaming/PairDStreamFunctions.scala  |   9 +-
 .../spark/streaming/StreamingContext.scala      |   1 +
 .../spark/streaming/api/java/JavaDStream.scala  |   2 +-
 .../streaming/api/java/JavaDStreamLike.scala    |   2 +-
 .../streaming/api/java/JavaPairDStream.scala    |   7 +-
 .../api/java/JavaStreamingContext.scala         |  25 +-
 .../streaming/dstream/CoGroupedDStream.scala    |   3 +-
 .../dstream/ConstantInputDStream.scala          |   2 +-
 .../streaming/dstream/FileInputDStream.scala    |   2 +-
 .../streaming/dstream/FilteredDStream.scala     |   2 +-
 .../dstream/FlatMapValuedDStream.scala          |   2 +-
 .../streaming/dstream/FlatMappedDStream.scala   |   2 +-
 .../streaming/dstream/FlumeInputDStream.scala   |  15 +-
 .../streaming/dstream/ForEachDStream.scala      |   2 +-
 .../streaming/dstream/GlommedDStream.scala      |   2 +-
 .../dstream/MapPartitionedDStream.scala         |   2 +-
 .../streaming/dstream/MapValuedDStream.scala    |   2 +-
 .../spark/streaming/dstream/MappedDStream.scala |   2 +-
 .../streaming/dstream/NetworkInputDStream.scala |  15 +-
 .../streaming/dstream/QueueInputDStream.scala   |   2 +-
 .../dstream/ReducedWindowedDStream.scala        |   2 +-
 .../streaming/dstream/ShuffledDStream.scala     |   3 +-
 .../spark/streaming/dstream/StateDStream.scala  |   2 +-
 .../streaming/dstream/TransformedDStream.scala  |   2 +-
 .../spark/streaming/dstream/UnionDStream.scala  |   2 +-
 .../streaming/dstream/WindowedDStream.scala     |   2 +-
 .../streaming/util/MasterFailureTest.scala      |   3 +-
 .../spark/streaming/util/RawTextSender.scala    |   3 +-
 .../apache/spark/streaming/TestSuiteBase.scala  |   5 +-
 .../tools/JavaAPICompletenessChecker.scala      |  38 +-
 210 files changed, 5290 insertions(+), 5246 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
----------------------------------------------------------------------
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index fec8737..44e26bb 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -19,7 +19,7 @@ package org.apache.spark.bagel
 
 import org.apache.spark._
 import org.apache.spark.SparkContext._
-
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
 object Bagel extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Accumulators.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index 5177ee5..6e922a6 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -21,6 +21,7 @@ import java.io._
 
 import scala.collection.mutable.Map
 import scala.collection.generic.Growable
+import org.apache.spark.serializer.JavaSerializer
 
 /**
  * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/CacheManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala
index 42e465b..e299a10 100644
--- a/core/src/main/scala/org/apache/spark/CacheManager.scala
+++ b/core/src/main/scala/org/apache/spark/CacheManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import scala.collection.mutable.{ArrayBuffer, HashSet}
 import org.apache.spark.storage.{BlockManager, StorageLevel}
+import org.apache.spark.rdd.RDD
 
 
 /** Spark class responsible for passing RDDs split contents to the BlockManager and making

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
deleted file mode 100644
index 71d9e62..0000000
--- a/core/src/main/scala/org/apache/spark/ClosureCleaner.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.lang.reflect.Field
-
-import scala.collection.mutable.Map
-import scala.collection.mutable.Set
-
-import org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
-import org.objectweb.asm.Opcodes._
-import java.io.{InputStream, IOException, ByteArrayOutputStream, ByteArrayInputStream, BufferedInputStream}
-
-private[spark] object ClosureCleaner extends Logging {
-  // Get an ASM class reader for a given class from the JAR that loaded it
-  private def getClassReader(cls: Class[_]): ClassReader = {
-    // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
-    val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
-    val resourceStream = cls.getResourceAsStream(className)
-    // todo: Fixme - continuing with earlier behavior ...
-    if (resourceStream == null) return new ClassReader(resourceStream)
-
-    val baos = new ByteArrayOutputStream(128)
-    Utils.copyStream(resourceStream, baos, true)
-    new ClassReader(new ByteArrayInputStream(baos.toByteArray))
-  }
-
-  // Check whether a class represents a Scala closure
-  private def isClosure(cls: Class[_]): Boolean = {
-    cls.getName.contains("$anonfun$")
-  }
-  
-  // Get a list of the classes of the outer objects of a given closure object, obj;
-  // the outer objects are defined as any closures that obj is nested within, plus
-  // possibly the class that the outermost closure is in, if any. We stop searching
-  // for outer objects beyond that because cloning the user's object is probably
-  // not a good idea (whereas we can clone closure objects just fine since we
-  // understand how all their fields are used).
-  private def getOuterClasses(obj: AnyRef): List[Class[_]] = {
-    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
-      f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.getType :: getOuterClasses(f.get(obj))
-      } else {
-        return f.getType :: Nil // Stop at the first $outer that is not a closure
-      }
-    }
-    return Nil
-  }
-  
-  // Get a list of the outer objects for a given closure object.
-  private def getOuterObjects(obj: AnyRef): List[AnyRef] = {
-    for (f <- obj.getClass.getDeclaredFields if f.getName == "$outer") {
-      f.setAccessible(true)
-      if (isClosure(f.getType)) {
-        return f.get(obj) :: getOuterObjects(f.get(obj))
-      } else {
-        return f.get(obj) :: Nil // Stop at the first $outer that is not a closure
-      }
-    }
-    return Nil
-  }
-  
-  private def getInnerClasses(obj: AnyRef): List[Class[_]] = {
-    val seen = Set[Class[_]](obj.getClass)
-    var stack = List[Class[_]](obj.getClass)
-    while (!stack.isEmpty) {
-      val cr = getClassReader(stack.head)
-      stack = stack.tail
-      val set = Set[Class[_]]()
-      cr.accept(new InnerClosureFinder(set), 0)
-      for (cls <- set -- seen) {
-        seen += cls
-        stack = cls :: stack
-      }
-    }
-    return (seen - obj.getClass).toList
-  }
-  
-  private def createNullValue(cls: Class[_]): AnyRef = {
-    if (cls.isPrimitive) {
-      new java.lang.Byte(0: Byte) // Should be convertible to any primitive type
-    } else {
-      null
-    }
-  }
-  
-  def clean(func: AnyRef) {
-    // TODO: cache outerClasses / innerClasses / accessedFields
-    val outerClasses = getOuterClasses(func)
-    val innerClasses = getInnerClasses(func)
-    val outerObjects = getOuterObjects(func)
-    
-    val accessedFields = Map[Class[_], Set[String]]()
-    for (cls <- outerClasses)
-      accessedFields(cls) = Set[String]()
-    for (cls <- func.getClass :: innerClasses)
-      getClassReader(cls).accept(new FieldAccessFinder(accessedFields), 0)
-    //logInfo("accessedFields: " + accessedFields)
-
-    val inInterpreter = {
-      try {
-        val interpClass = Class.forName("spark.repl.Main")
-        interpClass.getMethod("interp").invoke(null) != null
-      } catch {
-        case _: ClassNotFoundException => true
-      }
-    }
-
-    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
-    var outer: AnyRef = null
-    if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
-      // The closure is ultimately nested inside a class; keep the object of that
-      // class without cloning it since we don't want to clone the user's objects.
-      outer = outerPairs.head._2
-      outerPairs = outerPairs.tail
-    }
-    // Clone the closure objects themselves, nulling out any fields that are not
-    // used in the closure we're working on or any of its inner closures.
-    for ((cls, obj) <- outerPairs) {
-      outer = instantiateClass(cls, outer, inInterpreter)
-      for (fieldName <- accessedFields(cls)) {
-        val field = cls.getDeclaredField(fieldName)
-        field.setAccessible(true)
-        val value = field.get(obj)
-        //logInfo("1: Setting " + fieldName + " on " + cls + " to " + value);
-        field.set(outer, value)
-      }
-    }
-    
-    if (outer != null) {
-      //logInfo("2: Setting $outer on " + func.getClass + " to " + outer);
-      val field = func.getClass.getDeclaredField("$outer")
-      field.setAccessible(true)
-      field.set(func, outer)
-    }
-  }
-  
-  private def instantiateClass(cls: Class[_], outer: AnyRef, inInterpreter: Boolean): AnyRef = {
-    //logInfo("Creating a " + cls + " with outer = " + outer)
-    if (!inInterpreter) {
-      // This is a bona fide closure class, whose constructor has no effects
-      // other than to set its fields, so use its constructor
-      val cons = cls.getConstructors()(0)
-      val params = cons.getParameterTypes.map(createNullValue).toArray
-      if (outer != null)
-        params(0) = outer // First param is always outer object
-      return cons.newInstance(params: _*).asInstanceOf[AnyRef]
-    } else {
-      // Use reflection to instantiate object without calling constructor
-      val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
-      val parentCtor = classOf[java.lang.Object].getDeclaredConstructor()
-      val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
-      val obj = newCtor.newInstance().asInstanceOf[AnyRef]
-      if (outer != null) {
-        //logInfo("3: Setting $outer on " + cls + " to " + outer);
-        val field = cls.getDeclaredField("$outer")
-        field.setAccessible(true)
-        field.set(obj, outer)
-      }
-      return obj
-    }
-  }
-}
-
-private[spark] class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
-  override def visitMethod(access: Int, name: String, desc: String,
-      sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
-      override def visitFieldInsn(op: Int, owner: String, name: String, desc: String) {
-        if (op == GETFIELD) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
-          }
-        }
-      }
-      
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-          desc: String) {
-        // Check for calls a getter method for a variable in an interpreter wrapper object.
-        // This means that the corresponding field will be accessed, so we should save it.
-        if (op == INVOKEVIRTUAL && owner.endsWith("$iwC") && !name.endsWith("$outer")) {
-          for (cl <- output.keys if cl.getName == owner.replace('/', '.')) {
-            output(cl) += name
-          }
-        }
-      }
-    }
-  }
-}
-
-private[spark] class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM4) {
-  var myName: String = null
-  
-  override def visit(version: Int, access: Int, name: String, sig: String,
-      superName: String, interfaces: Array[String]) {
-    myName = name
-  }
-  
-  override def visitMethod(access: Int, name: String, desc: String,
-      sig: String, exceptions: Array[String]): MethodVisitor = {
-    return new MethodVisitor(ASM4) {
-      override def visitMethodInsn(op: Int, owner: String, name: String,
-          desc: String) {
-        val argTypes = Type.getArgumentTypes(desc)
-        if (op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
-            && argTypes(0).toString.startsWith("L") // is it an object?
-            && argTypes(0).getInternalName == myName)
-          output += Class.forName(
-              owner.replace('/', '.'),
-              false,
-              Thread.currentThread.getContextClassLoader)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala
index cc3c247..cc30105 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import org.apache.spark.rdd.RDD
+
 /**
  * Base class for dependencies.
  */

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
deleted file mode 100644
index dd34449..0000000
--- a/core/src/main/scala/org/apache/spark/DoubleRDDFunctions.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.MeanEvaluator
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.partial.SumEvaluator
-import org.apache.spark.util.StatCounter
-
-/**
- * Extra functions available on RDDs of Doubles through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
-  /** Add up the elements in this RDD. */
-  def sum(): Double = {
-    self.reduce(_ + _)
-  }
-
-  /**
-   * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
-   * of the RDD's elements in one operation.
-   */
-  def stats(): StatCounter = {
-    self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
-  }
-
-  /** Compute the mean of this RDD's elements. */
-  def mean(): Double = stats().mean
-
-  /** Compute the variance of this RDD's elements. */
-  def variance(): Double = stats().variance
-
-  /** Compute the standard deviation of this RDD's elements. */
-  def stdev(): Double = stats().stdev
-
-  /** 
-   * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
-   * estimating the standard deviation by dividing by N-1 instead of N).
-   */
-  def sampleStdev(): Double = stats().sampleStdev
-
-  /**
-   * Compute the sample variance of this RDD's elements (which corrects for bias in
-   * estimating the variance by dividing by N-1 instead of N).
-   */
-  def sampleVariance(): Double = stats().sampleVariance
-
-  /** (Experimental) Approximate operation to return the mean within a timeout. */
-  def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new MeanEvaluator(self.partitions.size, confidence)
-    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
-  }
-
-  /** (Experimental) Approximate operation to return the sum within a timeout. */
-  def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
-    val evaluator = new SumEvaluator(self.partitions.size, confidence)
-    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/HttpFileServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpFileServer.scala b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
index 9b3a896..ad1ee20 100644
--- a/core/src/main/scala/org/apache/spark/HttpFileServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpFileServer.scala
@@ -19,6 +19,7 @@ package org.apache.spark
 
 import java.io.{File}
 import com.google.common.io.Files
+import org.apache.spark.util.Utils
 
 private[spark] class HttpFileServer extends Logging {
   

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/HttpServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/HttpServer.scala b/core/src/main/scala/org/apache/spark/HttpServer.scala
index db36c7c..cdfc9dd 100644
--- a/core/src/main/scala/org/apache/spark/HttpServer.scala
+++ b/core/src/main/scala/org/apache/spark/HttpServer.scala
@@ -26,6 +26,7 @@ import org.eclipse.jetty.server.handler.DefaultHandler
 import org.eclipse.jetty.server.handler.HandlerList
 import org.eclipse.jetty.server.handler.ResourceHandler
 import org.eclipse.jetty.util.thread.QueuedThreadPool
+import org.apache.spark.util.Utils
 
 /**
  * Exception type thrown by HttpServer when it is in the wrong state for an operation.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/JavaSerializer.scala
deleted file mode 100644
index f43396c..0000000
--- a/core/src/main/scala/org/apache/spark/JavaSerializer.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.nio.ByteBuffer
-
-import serializer.{Serializer, SerializerInstance, DeserializationStream, SerializationStream}
-import org.apache.spark.util.ByteBufferInputStream
-
-private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
-  val objOut = new ObjectOutputStream(out)
-  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
-  def flush() { objOut.flush() }
-  def close() { objOut.close() }
-}
-
-private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
-extends DeserializationStream {
-  val objIn = new ObjectInputStream(in) {
-    override def resolveClass(desc: ObjectStreamClass) =
-      Class.forName(desc.getName, false, loader)
-  }
-
-  def readObject[T](): T = objIn.readObject().asInstanceOf[T]
-  def close() { objIn.close() }
-}
-
-private[spark] class JavaSerializerInstance extends SerializerInstance {
-  def serialize[T](t: T): ByteBuffer = {
-    val bos = new ByteArrayOutputStream()
-    val out = serializeStream(bos)
-    out.writeObject(t)
-    out.close()
-    ByteBuffer.wrap(bos.toByteArray)
-  }
-
-  def deserialize[T](bytes: ByteBuffer): T = {
-    val bis = new ByteBufferInputStream(bytes)
-    val in = deserializeStream(bis)
-    in.readObject().asInstanceOf[T]
-  }
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
-    val bis = new ByteBufferInputStream(bytes)
-    val in = deserializeStream(bis, loader)
-    in.readObject().asInstanceOf[T]
-  }
-
-  def serializeStream(s: OutputStream): SerializationStream = {
-    new JavaSerializationStream(s)
-  }
-
-  def deserializeStream(s: InputStream): DeserializationStream = {
-    new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
-  }
-
-  def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
-    new JavaDeserializationStream(s, loader)
-  }
-}
-
-/**
- * A Spark serializer that uses Java's built-in serialization.
- */
-class JavaSerializer extends Serializer {
-  def newInstance(): SerializerInstance = new JavaSerializerInstance
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/KryoSerializer.scala
deleted file mode 100644
index db86e6d..0000000
--- a/core/src/main/scala/org/apache/spark/KryoSerializer.scala
+++ /dev/null
@@ -1,156 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.nio.ByteBuffer
-import com.esotericsoftware.kryo.{Kryo, KryoException}
-import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
-import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
-import com.twitter.chill.ScalaKryoInstantiator
-import serializer.{SerializerInstance, DeserializationStream, SerializationStream}
-import org.apache.spark.broadcast._
-import org.apache.spark.storage._
-
-private[spark]
-class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
-  val output = new KryoOutput(outStream)
-
-  def writeObject[T](t: T): SerializationStream = {
-    kryo.writeClassAndObject(output, t)
-    this
-  }
-
-  def flush() { output.flush() }
-  def close() { output.close() }
-}
-
-private[spark]
-class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
-  val input = new KryoInput(inStream)
-
-  def readObject[T](): T = {
-    try {
-      kryo.readClassAndObject(input).asInstanceOf[T]
-    } catch {
-      // DeserializationStream uses the EOF exception to indicate stopping condition.
-      case _: KryoException => throw new EOFException
-    }
-  }
-
-  def close() {
-    // Kryo's Input automatically closes the input stream it is using.
-    input.close()
-  }
-}
-
-private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
-  val kryo = ks.newKryo()
-  val output = ks.newKryoOutput()
-  val input = ks.newKryoInput()
-
-  def serialize[T](t: T): ByteBuffer = {
-    output.clear()
-    kryo.writeClassAndObject(output, t)
-    ByteBuffer.wrap(output.toBytes)
-  }
-
-  def deserialize[T](bytes: ByteBuffer): T = {
-    input.setBuffer(bytes.array)
-    kryo.readClassAndObject(input).asInstanceOf[T]
-  }
-
-  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
-    val oldClassLoader = kryo.getClassLoader
-    kryo.setClassLoader(loader)
-    input.setBuffer(bytes.array)
-    val obj = kryo.readClassAndObject(input).asInstanceOf[T]
-    kryo.setClassLoader(oldClassLoader)
-    obj
-  }
-
-  def serializeStream(s: OutputStream): SerializationStream = {
-    new KryoSerializationStream(kryo, s)
-  }
-
-  def deserializeStream(s: InputStream): DeserializationStream = {
-    new KryoDeserializationStream(kryo, s)
-  }
-}
-
-/**
- * Interface implemented by clients to register their classes with Kryo when using Kryo
- * serialization.
- */
-trait KryoRegistrator {
-  def registerClasses(kryo: Kryo)
-}
-
-/**
- * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
- */
-class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
-  private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
-
-  def newKryoOutput() = new KryoOutput(bufferSize)
-
-  def newKryoInput() = new KryoInput(bufferSize)
-
-  def newKryo(): Kryo = {
-    val instantiator = new ScalaKryoInstantiator
-    val kryo = instantiator.newKryo()
-    val classLoader = Thread.currentThread.getContextClassLoader
-
-    // Register some commonly used classes
-    val toRegister: Seq[AnyRef] = Seq(
-      ByteBuffer.allocate(1),
-      StorageLevel.MEMORY_ONLY,
-      PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
-      GotBlock("1", ByteBuffer.allocate(1)),
-      GetBlock("1")
-    )
-
-    for (obj <- toRegister) kryo.register(obj.getClass)
-
-    // Allow sending SerializableWritable
-    kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
-    kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
-
-    // Allow the user to register their own classes by setting spark.kryo.registrator
-    try {
-      Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
-        logDebug("Running user registrator: " + regCls)
-        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
-        reg.registerClasses(kryo)
-      }
-    } catch {
-      case _: Exception => println("Failed to register spark.kryo.registrator")
-    }
-
-    kryo.setClassLoader(classLoader)
-
-    // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
-    kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
-
-    kryo
-  }
-
-  def newInstance(): SerializerInstance = {
-    new KryoSerializerInstance(this)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index 0f422d9..ae7cf2a 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -32,7 +32,7 @@ import akka.util.Duration
 
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.storage.BlockManagerId
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util.{Utils, MetadataCleaner, TimeStampedHashMap}
 
 
 private[spark] sealed trait MapOutputTrackerMessage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
deleted file mode 100644
index d046e7c..0000000
--- a/core/src/main/scala/org/apache/spark/PairRDDFunctions.scala
+++ /dev/null
@@ -1,703 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.nio.ByteBuffer
-import java.util.{Date, HashMap => JHashMap}
-import java.text.SimpleDateFormat
-
-import scala.collection.{mutable, Map}
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.JavaConversions._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.SequenceFile.CompressionType
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.mapred.FileOutputFormat
-import org.apache.hadoop.mapred.SparkHadoopWriter
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-
-import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
-import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat,
-    RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, SparkHadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
-
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd._
-import org.apache.spark.SparkContext._
-import org.apache.spark.Partitioner._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
- * Import `spark.SparkContext._` at the top of your program to use these functions.
- */
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
-  extends Logging
-  with SparkHadoopMapReduceUtil
-  with Serializable {
-
-  /**
-   * Generic function to combine the elements for each key using a custom set of aggregation
-   * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
-   * Note that V and C can be different -- for example, one might group an RDD of type
-   * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
-   *
-   * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
-   * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
-   * - `mergeCombiners`, to combine two C's into a single one.
-   *
-   * In addition, users can control the partitioning of the output RDD, and whether to perform
-   * map-side aggregation (if a mapper can produce multiple items with the same key).
-   */
-  def combineByKey[C](createCombiner: V => C,
-      mergeValue: (C, V) => C,
-      mergeCombiners: (C, C) => C,
-      partitioner: Partitioner,
-      mapSideCombine: Boolean = true,
-      serializerClass: String = null): RDD[(K, C)] = {
-    if (getKeyClass().isArray) {
-      if (mapSideCombine) {
-        throw new SparkException("Cannot use map-side combining with array keys.")
-      }
-      if (partitioner.isInstanceOf[HashPartitioner]) {
-        throw new SparkException("Default partitioner cannot partition array keys.")
-      }
-    }
-    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
-    if (self.partitioner == Some(partitioner)) {
-      self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-    } else if (mapSideCombine) {
-      val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
-        .setSerializer(serializerClass)
-      partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
-    } else {
-      // Don't apply map-side combiner.
-      // A sanity check to make sure mergeCombiners is not defined.
-      assert(mergeCombiners == null)
-      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
-      values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
-    }
-  }
-
-  /**
-   * Simplified version of combineByKey that hash-partitions the output RDD.
-   */
-  def combineByKey[C](createCombiner: V => C,
-      mergeValue: (C, V) => C,
-      mergeCombiners: (C, C) => C,
-      numPartitions: Int): RDD[(K, C)] = {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = {
-    // Serialize the zero value to a byte array so that we can get a new clone of it on each key
-    val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue)
-    val zeroArray = new Array[Byte](zeroBuffer.limit)
-    zeroBuffer.get(zeroArray)
-
-    // When deserializing, use a lazy val to create just one instance of the serializer per task
-    lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance()
-    def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))
-
-    combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner)
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = {
-    foldByKey(zeroValue, new HashPartitioner(numPartitions))(func)
-  }
-
-  /**
-   * Merge the values for each key using an associative function and a neutral "zero value" which may
-   * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for
-   * list concatenation, 0 for addition, or 1 for multiplication.).
-   */
-  def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = {
-    foldByKey(zeroValue, defaultPartitioner(self))(func)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce.
-   */
-  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
-    combineByKey[V]((v: V) => v, func, func, partitioner)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function, but return the results
-   * immediately to the master as a Map. This will also perform the merging locally on each mapper
-   * before sending results to a reducer, similarly to a "combiner" in MapReduce.
-   */
-  def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
-
-    if (getKeyClass().isArray) {
-      throw new SparkException("reduceByKeyLocally() does not support array keys")
-    }
-
-    def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
-      val map = new JHashMap[K, V]
-      iter.foreach { case (k, v) =>
-        val old = map.get(k)
-        map.put(k, if (old == null) v else func(old, v))
-      }
-      Iterator(map)
-    }
-
-    def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
-      m2.foreach { case (k, v) =>
-        val old = m1.get(k)
-        m1.put(k, if (old == null) v else func(old, v))
-      }
-      m1
-    }
-
-    self.mapPartitions(reducePartition).reduce(mergeMaps)
-  }
-
-  /** Alias for reduceByKeyLocally */
-  def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
-
-  /** Count the number of elements for each key, and return the result to the master as a Map. */
-  def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
-
-  /**
-   * (Experimental) Approximate version of countByKey that can return a partial result if it does
-   * not finish within a timeout.
-   */
-  def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
-      : PartialResult[Map[K, BoundedDouble]] = {
-    self.map(_._1).countByValueApprox(timeout, confidence)
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
-   */
-  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
-    reduceByKey(new HashPartitioner(numPartitions), func)
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Allows controlling the
-   * partitioning of the resulting key-value pair RDD by passing a Partitioner.
-   */
-  def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
-    // groupByKey shouldn't use map side combine because map side combine does not
-    // reduce the amount of data shuffled and requires all map side data be inserted
-    // into a hash table, leading to more objects in the old gen.
-    def createCombiner(v: V) = ArrayBuffer(v)
-    def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
-    val bufs = combineByKey[ArrayBuffer[V]](
-      createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false)
-    bufs.asInstanceOf[RDD[(K, Seq[V])]]
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with into `numPartitions` partitions.
-   */
-  def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
-    groupByKey(new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Return a copy of the RDD partitioned using the specified partitioner.
-   */
-  def partitionBy(partitioner: Partitioner): RDD[(K, V)] = {
-    if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    new ShuffledRDD[K, V, (K, V)](self, partitioner)
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
-   */
-  def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
-    }
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (ws.isEmpty) {
-        vs.iterator.map(v => (v, None))
-      } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
-      }
-    }
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
-   * partition the output RDD.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
-      : RDD[(K, (Option[V], W))] = {
-    this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
-      if (vs.isEmpty) {
-        ws.iterator.map(w => (None, w))
-      } else {
-        for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
-      }
-    }
-  }
-
-  /**
-   * Simplified version of combineByKey that hash-partitions the resulting RDD using the
-   * existing partitioner/parallelism level.
-   */
-  def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
-      : RDD[(K, C)] = {
-    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
-  }
-
-  /**
-   * Merge the values for each key using an associative reduce function. This will also perform
-   * the merging locally on each mapper before sending results to a reducer, similarly to a
-   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
-   * parallelism level.
-   */
-  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
-    reduceByKey(defaultPartitioner(self), func)
-  }
-
-  /**
-   * Group the values for each key in the RDD into a single sequence. Hash-partitions the
-   * resulting RDD with the existing partitioner/parallelism level.
-   */
-  def groupByKey(): RDD[(K, Seq[V])] = {
-    groupByKey(defaultPartitioner(self))
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
-    join(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
-   * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
-   * (k, v2) is in `other`. Performs a hash join across the cluster.
-   */
-  def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = {
-    join(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * using the existing partitioner/parallelism level.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
-    leftOuterJoin(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
-   * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
-   * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
-   * into `numPartitions` partitions.
-   */
-  def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = {
-    leftOuterJoin(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD using the existing partitioner/parallelism level.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
-    rightOuterJoin(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
-   * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
-   * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
-   * RDD into the given number of partitions.
-   */
-  def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = {
-    rightOuterJoin(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * Return the key-value pairs in this RDD to the master as a Map.
-   */
-  def collectAsMap(): Map[K, V] = {
-    val data = self.toArray()
-    val map = new mutable.HashMap[K, V]
-    map.sizeHint(data.length)
-    data.foreach { case (k, v) => map.put(k, v) }
-    map
-  }
-
-  /**
-   * Pass each value in the key-value pair RDD through a map function without changing the keys;
-   * this also retains the original RDD's partitioning.
-   */
-  def mapValues[U](f: V => U): RDD[(K, U)] = {
-    val cleanF = self.context.clean(f)
-    new MappedValuesRDD(self, cleanF)
-  }
-
-  /**
-   * Pass each value in the key-value pair RDD through a flatMap function without changing the
-   * keys; this also retains the original RDD's partitioning.
-   */
-  def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
-    val cleanF = self.context.clean(f)
-    new FlatMappedValuesRDD(self, cleanF)
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
-    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues { case Seq(vs, ws) =>
-      (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
-    }
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
-      throw new SparkException("Default partitioner cannot partition array keys.")
-    }
-    val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
-    val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
-    prfs.mapValues { case Seq(vs, w1s, w2s) =>
-      (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
-    }
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner(self, other))
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
-  }
-
-  /**
-   * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
-   * list of values for that key in `this` as well as `other`.
-   */
-  def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, new HashPartitioner(numPartitions))
-  }
-
-  /**
-   * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
-   * tuple with the list of values for that key in `this`, `other1` and `other2`.
-   */
-  def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, new HashPartitioner(numPartitions))
-  }
-
-  /** Alias for cogroup. */
-  def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
-    cogroup(other, defaultPartitioner(self, other))
-  }
-
-  /** Alias for cogroup. */
-  def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
-      : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
-    cogroup(other1, other2, defaultPartitioner(self, other1, other2))
-  }
-
-  /**
-   * Return an RDD with the pairs from `this` whose keys are not in `other`.
-   *
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
-    subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
-
-  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
-    subtractByKey(other, new HashPartitioner(numPartitions))
-
-  /** Return an RDD with the pairs from `this` whose keys are not in `other`. */
-  def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
-    new SubtractedRDD[K, V, W](self, other, p)
-
-  /**
-   * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
-   * RDD has a known partitioner by only searching the partition that the key maps to.
-   */
-  def lookup(key: K): Seq[V] = {
-    self.partitioner match {
-      case Some(p) =>
-        val index = p.getPartition(key)
-        def process(it: Iterator[(K, V)]): Seq[V] = {
-          val buf = new ArrayBuffer[V]
-          for ((k, v) <- it if k == key) {
-            buf += v
-          }
-          buf
-        }
-        val res = self.context.runJob(self, process _, Array(index), false)
-        res(0)
-      case None =>
-        self.filter(_._1 == key).map(_._2).collect()
-    }
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD.
-   */
-  def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD. Compress the result with the
-   * supplied codec.
-   */
-  def saveAsHadoopFile[F <: OutputFormat[K, V]](
-      path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassManifest[F]) {
-    saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]], codec)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
-   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
-   */
-  def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
-    saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
-   * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
-   */
-  def saveAsNewAPIHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
-      conf: Configuration = self.context.hadoopConfiguration) {
-    val job = new NewAPIHadoopJob(conf)
-    job.setOutputKeyClass(keyClass)
-    job.setOutputValueClass(valueClass)
-    val wrappedConf = new SerializableWritable(job.getConfiguration)
-    NewFileOutputFormat.setOutputPath(job, new Path(path))
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
-    val stageId = self.id
-    def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = {
-      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-      // around by taking a mod. We expect that no task will be attempted 2 billion times.
-      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-      /* "reduce task" <split #> <attempt # = spark task #> */
-      val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.splitId, attemptNumber)
-      val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
-      val format = outputFormatClass.newInstance
-      val committer = format.getOutputCommitter(hadoopContext)
-      committer.setupTask(hadoopContext)
-      val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]]
-      while (iter.hasNext) {
-        val (k, v) = iter.next()
-        writer.write(k, v)
-      }
-      writer.close(hadoopContext)
-      committer.commitTask(hadoopContext)
-      return 1
-    }
-    val jobFormat = outputFormatClass.newInstance
-    /* apparently we need a TaskAttemptID to construct an OutputCommitter;
-     * however we're only going to use this local OutputCommitter for
-     * setupJob/commitJob, so we just use a dummy "map" task.
-     */
-    val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0)
-    val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId)
-    val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext)
-    jobCommitter.setupJob(jobTaskContext)
-    val count = self.context.runJob(self, writeShard _).sum
-    jobCommitter.commitJob(jobTaskContext)
-    jobCommitter.cleanupJob(jobTaskContext)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD. Compress with the supplied codec.
-   */
-  def saveAsHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      codec: Class[_ <: CompressionCodec]) {
-    saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass,
-      new JobConf(self.context.hadoopConfiguration), Some(codec))
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
-   * supporting the key and value types K and V in this RDD.
-   */
-  def saveAsHadoopFile(
-      path: String,
-      keyClass: Class[_],
-      valueClass: Class[_],
-      outputFormatClass: Class[_ <: OutputFormat[_, _]],
-      conf: JobConf = new JobConf(self.context.hadoopConfiguration),
-      codec: Option[Class[_ <: CompressionCodec]] = None) {
-    conf.setOutputKeyClass(keyClass)
-    conf.setOutputValueClass(valueClass)
-    // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug
-    conf.set("mapred.output.format.class", outputFormatClass.getName)
-    for (c <- codec) {
-      conf.setCompressMapOutput(true)
-      conf.set("mapred.output.compress", "true")
-      conf.setMapOutputCompressorClass(c)
-      conf.set("mapred.output.compression.codec", c.getCanonicalName)
-      conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)
-    }
-    conf.setOutputCommitter(classOf[FileOutputCommitter])
-    FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf))
-    saveAsHadoopDataset(conf)
-  }
-
-  /**
-   * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
-   * that storage system. The JobConf should set an OutputFormat and any output paths required
-   * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
-   * MapReduce job.
-   */
-  def saveAsHadoopDataset(conf: JobConf) {
-    val outputFormatClass = conf.getOutputFormat
-    val keyClass = conf.getOutputKeyClass
-    val valueClass = conf.getOutputValueClass
-    if (outputFormatClass == null) {
-      throw new SparkException("Output format class not set")
-    }
-    if (keyClass == null) {
-      throw new SparkException("Output key class not set")
-    }
-    if (valueClass == null) {
-      throw new SparkException("Output value class not set")
-    }
-
-    logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")")
-
-    val writer = new SparkHadoopWriter(conf)
-    writer.preSetup()
-
-    def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
-      // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-      // around by taking a mod. We expect that no task will be attempted 2 billion times.
-      val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-
-      writer.setup(context.stageId, context.splitId, attemptNumber)
-      writer.open()
-
-      var count = 0
-      while(iter.hasNext) {
-        val record = iter.next()
-        count += 1
-        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
-      }
-
-      writer.close()
-      writer.commit()
-    }
-
-    self.context.runJob(self, writeToFile _)
-    writer.commitJob()
-    writer.cleanup()
-  }
-
-  /**
-   * Return an RDD with the keys of each tuple.
-   */
-  def keys: RDD[K] = self.map(_._1)
-
-  /**
-   * Return an RDD with the values of each tuple.
-   */
-  def values: RDD[V] = self.map(_._2)
-
-  private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
-
-  private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
-}
-
-
-private[spark] object Manifests {
-  val seqSeqManifest = classManifest[Seq[Seq[_]]]
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index 4dce260..0e2c987 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark
 
+import org.apache.spark.util.Utils
+import org.apache.spark.rdd.RDD
+
 /**
  * An object that defines how the elements in a key-value pair RDD are partitioned by key.
  * Maps each key to a partition ID, from 0 to `numPartitions - 1`.