You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:46 UTC

[62/69] [abbrv] Move some classes to more appropriate packages:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
new file mode 100644
index 0000000..e143ecd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -0,0 +1,942 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import java.util.Random
+
+import scala.collection.Map
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.io.BytesWritable
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.TextOutputFormat
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+import org.apache.spark.Partitioner._
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.partial.BoundedDouble
+import org.apache.spark.partial.CountEvaluator
+import org.apache.spark.partial.GroupedCountEvaluator
+import org.apache.spark.partial.PartialResult
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.util.{Utils, BoundedPriorityQueue}
+
+import org.apache.spark.SparkContext._
+import org.apache.spark._
+
+/**
+ * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
+ * partitioned collection of elements that can be operated on in parallel. This class contains the
+ * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
+ * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
+ * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains
+ * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]]
+ * contains operations available on RDDs that can be saved as SequenceFiles. These operations are
+ * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit
+ * conversions when you `import org.apache.spark.SparkContext._`.
+ *
+ * Internally, each RDD is characterized by five main properties:
+ *
+ *  - A list of partitions
+ *  - A function for computing each split
+ *  - A list of dependencies on other RDDs
+ *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
+ *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
+ *    an HDFS file)
+ *
+ * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
+ * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
+ * reading data from a new storage system) by overriding these functions. Please refer to the
+ * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
+ * on RDD internals.
+ */
+abstract class RDD[T: ClassManifest](
+    @transient private var sc: SparkContext,
+    @transient private var deps: Seq[Dependency[_]]
+  ) extends Serializable with Logging {
+
+  /** Construct an RDD with just a one-to-one dependency on one parent */
+  def this(@transient oneParent: RDD[_]) =
+    this(oneParent.context , List(new OneToOneDependency(oneParent)))
+
+  // =======================================================================
+  // Methods that should be implemented by subclasses of RDD
+  // =======================================================================
+
+  /** Implemented by subclasses to compute a given partition. */
+  def compute(split: Partition, context: TaskContext): Iterator[T]
+
+  /**
+   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
+   * be called once, so it is safe to implement a time-consuming computation in it.
+   */
+  protected def getPartitions: Array[Partition]
+
+  /**
+   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
+   * be called once, so it is safe to implement a time-consuming computation in it.
+   */
+  protected def getDependencies: Seq[Dependency[_]] = deps
+
+  /** Optionally overridden by subclasses to specify placement preferences. */
+  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
+
+  /** Optionally overridden by subclasses to specify how they are partitioned. */
+  val partitioner: Option[Partitioner] = None
+
+  // =======================================================================
+  // Methods and fields available on all RDDs
+  // =======================================================================
+
+  /** The SparkContext that created this RDD. */
+  def sparkContext: SparkContext = sc
+
+  /** A unique ID for this RDD (within its SparkContext). */
+  val id: Int = sc.newRddId()
+
+  /** A friendly name for this RDD */
+  var name: String = null
+
+  /** Assign a name to this RDD */
+  def setName(_name: String) = {
+    name = _name
+    this
+  }
+
+  /** User-defined generator of this RDD*/
+  var generator = Utils.getCallSiteInfo.firstUserClass
+
+  /** Reset generator*/
+  def setGenerator(_generator: String) = {
+    generator = _generator
+  }
+
+  /**
+   * Set this RDD's storage level to persist its values across operations after the first time
+   * it is computed. This can only be used to assign a new storage level if the RDD does not
+   * have a storage level set yet..
+   */
+  def persist(newLevel: StorageLevel): RDD[T] = {
+    // TODO: Handle changes of StorageLevel
+    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
+      throw new UnsupportedOperationException(
+        "Cannot change storage level of an RDD after it was already assigned a level")
+    }
+    storageLevel = newLevel
+    // Register the RDD with the SparkContext
+    sc.persistentRdds(id) = this
+    this
+  }
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
+
+  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+  def cache(): RDD[T] = persist()
+
+  /**
+   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
+   *
+   * @param blocking Whether to block until all blocks are deleted.
+   * @return This RDD.
+   */
+  def unpersist(blocking: Boolean = true): RDD[T] = {
+    logInfo("Removing RDD " + id + " from persistence list")
+    sc.env.blockManager.master.removeRdd(id, blocking)
+    sc.persistentRdds.remove(id)
+    storageLevel = StorageLevel.NONE
+    this
+  }
+
+  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
+  def getStorageLevel = storageLevel
+
+  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
+  // be overwritten when we're checkpointed
+  private var dependencies_ : Seq[Dependency[_]] = null
+  @transient private var partitions_ : Array[Partition] = null
+
+  /** An Option holding our checkpoint RDD, if we are checkpointed */
+  private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
+
+  /**
+   * Get the list of dependencies of this RDD, taking into account whether the
+   * RDD is checkpointed or not.
+   */
+  final def dependencies: Seq[Dependency[_]] = {
+    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
+      if (dependencies_ == null) {
+        dependencies_ = getDependencies
+      }
+      dependencies_
+    }
+  }
+
+  /**
+   * Get the array of partitions of this RDD, taking into account whether the
+   * RDD is checkpointed or not.
+   */
+  final def partitions: Array[Partition] = {
+    checkpointRDD.map(_.partitions).getOrElse {
+      if (partitions_ == null) {
+        partitions_ = getPartitions
+      }
+      partitions_
+    }
+  }
+
+  /**
+   * Get the preferred locations of a partition (as hostnames), taking into account whether the
+   * RDD is checkpointed.
+   */
+  final def preferredLocations(split: Partition): Seq[String] = {
+    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
+      getPreferredLocations(split)
+    }
+  }
+
+  /**
+   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
+   * This should ''not'' be called by users directly, but is available for implementors of custom
+   * subclasses of RDD.
+   */
+  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
+    if (storageLevel != StorageLevel.NONE) {
+      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
+    } else {
+      computeOrReadCheckpoint(split, context)
+    }
+  }
+
+  /**
+   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
+   */
+  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
+    if (isCheckpointed) {
+      firstParent[T].iterator(split, context)
+    } else {
+      compute(split, context)
+    }
+  }
+
+  // Transformations (return a new RDD)
+
+  /**
+   * Return a new RDD by applying a function to all elements of this RDD.
+   */
+  def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
+
+  /**
+   *  Return a new RDD by first applying a function to all elements of this
+   *  RDD, and then flattening the results.
+   */
+  def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
+    new FlatMappedRDD(this, sc.clean(f))
+
+  /**
+   * Return a new RDD containing only the elements that satisfy a predicate.
+   */
+  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
+
+  /**
+   * Return a new RDD containing the distinct elements in this RDD.
+   */
+  def distinct(numPartitions: Int): RDD[T] =
+    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
+
+  def distinct(): RDD[T] = distinct(partitions.size)
+
+  /**
+   * Return a new RDD that is reduced into `numPartitions` partitions.
+   */
+  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
+    if (shuffle) {
+      // include a shuffle step so that our upstream tasks are still distributed
+      new CoalescedRDD(
+        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+        new HashPartitioner(numPartitions)),
+        numPartitions).keys
+    } else {
+      new CoalescedRDD(this, numPartitions)
+    }
+  }
+
+  /**
+   * Return a sampled subset of this RDD.
+   */
+  def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
+    new SampledRDD(this, withReplacement, fraction, seed)
+
+  def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
+    var fraction = 0.0
+    var total = 0
+    val multiplier = 3.0
+    val initialCount = this.count()
+    var maxSelected = 0
+
+    if (num < 0) {
+      throw new IllegalArgumentException("Negative number of elements requested")
+    }
+
+    if (initialCount > Integer.MAX_VALUE - 1) {
+      maxSelected = Integer.MAX_VALUE - 1
+    } else {
+      maxSelected = initialCount.toInt
+    }
+
+    if (num > initialCount && !withReplacement) {
+      total = maxSelected
+      fraction = multiplier * (maxSelected + 1) / initialCount
+    } else {
+      fraction = multiplier * (num + 1) / initialCount
+      total = num
+    }
+
+    val rand = new Random(seed)
+    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+
+    // If the first sample didn't turn out large enough, keep trying to take samples;
+    // this shouldn't happen often because we use a big multiplier for thei initial size
+    while (samples.length < total) {
+      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
+    }
+
+    Utils.randomizeInPlace(samples, rand).take(total)
+  }
+
+  /**
+   * Return the union of this RDD and another one. Any identical elements will appear multiple
+   * times (use `.distinct()` to eliminate them).
+   */
+  def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
+
+  /**
+   * Return the union of this RDD and another one. Any identical elements will appear multiple
+   * times (use `.distinct()` to eliminate them).
+   */
+  def ++(other: RDD[T]): RDD[T] = this.union(other)
+
+  /**
+   * Return an RDD created by coalescing all elements within each partition into an array.
+   */
+  def glom(): RDD[Array[T]] = new GlommedRDD(this)
+
+  /**
+   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
+   * elements (a, b) where a is in `this` and b is in `other`.
+   */
+  def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+
+  /**
+   * Return an RDD of grouped items.
+   */
+  def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
+    groupBy[K](f, defaultPartitioner(this))
+
+  /**
+   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
+   * mapping to that key.
+   */
+  def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+    groupBy(f, new HashPartitioner(numPartitions))
+
+  /**
+   * Return an RDD of grouped items.
+   */
+  def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+    val cleanF = sc.clean(f)
+    this.map(t => (cleanF(t), t)).groupByKey(p)
+  }
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: String): RDD[String] = new PipedRDD(this, command)
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   */
+  def pipe(command: String, env: Map[String, String]): RDD[String] =
+    new PipedRDD(this, command, env)
+
+
+  /**
+   * Return an RDD created by piping elements to a forked external process.
+   * The print behavior can be customized by providing two functions.
+   *
+   * @param command command to run in forked process.
+   * @param env environment variables to set.
+   * @param printPipeContext Before piping elements, this function is called as an oppotunity
+   *                         to pipe context data. Print line function (like out.println) will be
+   *                         passed as printPipeContext's parameter.
+   * @param printRDDElement Use this function to customize how to pipe elements. This function
+   *                        will be called with each RDD element as the 1st parameter, and the
+   *                        print line function (like out.println()) as the 2nd parameter.
+   *                        An example of pipe the RDD data of groupBy() in a streaming way,
+   *                        instead of constructing a huge String to concat all the elements:
+   *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
+   *                          for (e <- record._2){f(e)}
+   * @return the result RDD
+   */
+  def pipe(
+      command: Seq[String],
+      env: Map[String, String] = Map(),
+      printPipeContext: (String => Unit) => Unit = null,
+      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
+    new PipedRDD(this, command, env,
+      if (printPipeContext ne null) sc.clean(printPipeContext) else null,
+      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD.
+   */
+  def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+   * of the original partition.
+   */
+  def mapPartitionsWithIndex[U: ClassManifest](
+    f: (Int, Iterator[T]) => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+   * of the original partition.
+   */
+  @deprecated("use mapPartitionsWithIndex", "0.7.0")
+  def mapPartitionsWithSplit[U: ClassManifest](
+    f: (Int, Iterator[T]) => Iterator[U],
+    preservesPartitioning: Boolean = false): RDD[U] =
+    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
+
+  /**
+   * Maps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => U): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.map(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
+    (f:(T, A) => Seq[U]): RDD[U] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
+        val a = constructA(index)
+        iter.flatMap(t => f(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
+  }
+
+  /**
+   * Applies f to each element of this RDD, where f takes an additional parameter of type A.
+   * This additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def foreachWith[A: ClassManifest](constructA: Int => A)
+    (f:(T, A) => Unit) {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.map(t => {f(t, a); t})
+      }
+    (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
+  }
+
+  /**
+   * Filters this RDD with p, where p takes an additional parameter of type A.  This
+   * additional parameter is produced by constructA, which is called in each
+   * partition with the index of that partition.
+   */
+  def filterWith[A: ClassManifest](constructA: Int => A)
+    (p:(T, A) => Boolean): RDD[T] = {
+      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
+        val a = constructA(index)
+        iter.filter(t => p(t, a))
+      }
+    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
+  }
+
+  /**
+   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
+   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
+   * partitions* and the *same number of elements in each partition* (e.g. one was made through
+   * a map on the other).
+   */
+  def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
+
+  /**
+   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
+   * applying a function to the zipped partitions. Assumes that all the RDDs have the
+   * *same number of partitions*, but does *not* require them to have the same number
+   * of elements in each partition.
+   */
+  def zipPartitions[B: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B])
+      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C])
+      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
+
+  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
+      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
+      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
+    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
+
+
+  // Actions (launch a job to return a value to the user program)
+
+  /**
+   * Applies a function f to all elements of this RDD.
+   */
+  def foreach(f: T => Unit) {
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
+  }
+
+  /**
+   * Applies a function f to each partition of this RDD.
+   */
+  def foreachPartition(f: Iterator[T] => Unit) {
+    val cleanF = sc.clean(f)
+    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
+  }
+
+  /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def collect(): Array[T] = {
+    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
+    Array.concat(results: _*)
+  }
+
+  /**
+   * Return an array that contains all of the elements in this RDD.
+   */
+  def toArray(): Array[T] = collect()
+
+  /**
+   * Return an RDD that contains all matching values by applying `f`.
+   */
+  def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
+    filter(f.isDefinedAt).map(f)
+  }
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   *
+   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+   * RDD will be <= us.
+   */
+  def subtract(other: RDD[T]): RDD[T] =
+    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
+    subtract(other, new HashPartitioner(numPartitions))
+
+  /**
+   * Return an RDD with the elements from `this` that are not in `other`.
+   */
+  def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
+    if (partitioner == Some(p)) {
+      // Our partitioner knows how to handle T (which, since we have a partitioner, is
+      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
+      val p2 = new Partitioner() {
+        override def numPartitions = p.numPartitions
+        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
+      }
+      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
+      // anyway, and when calling .keys, will not have a partitioner set, even though
+      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
+      // partitioned by the right/real keys (e.g. p).
+      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
+    } else {
+      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
+    }
+  }
+
+  /**
+   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
+   */
+  def reduce(f: (T, T) => T): T = {
+    val cleanF = sc.clean(f)
+    val reducePartition: Iterator[T] => Option[T] = iter => {
+      if (iter.hasNext) {
+        Some(iter.reduceLeft(cleanF))
+      } else {
+        None
+      }
+    }
+    var jobResult: Option[T] = None
+    val mergeResult = (index: Int, taskResult: Option[T]) => {
+      if (taskResult != None) {
+        jobResult = jobResult match {
+          case Some(value) => Some(f(value, taskResult.get))
+          case None => taskResult
+        }
+      }
+    }
+    sc.runJob(this, reducePartition, mergeResult)
+    // Get the final result out of our Option, or throw an exception if the RDD was empty
+    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
+  }
+
+  /**
+   * Aggregate the elements of each partition, and then the results for all the partitions, using a
+   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
+   * modify t1 and return it as its result value to avoid object allocation; however, it should not
+   * modify t2.
+   */
+  def fold(zeroValue: T)(op: (T, T) => T): T = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+    val cleanOp = sc.clean(op)
+    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
+    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
+    sc.runJob(this, foldPartition, mergeResult)
+    jobResult
+  }
+
+  /**
+   * Aggregate the elements of each partition, and then the results for all the partitions, using
+   * given combine functions and a neutral "zero value". This function can return a different result
+   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
+   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
+   * allowed to modify and return their first argument instead of creating a new U to avoid memory
+   * allocation.
+   */
+  def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
+    // Clone the zero value since we will also be serializing it as part of tasks
+    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
+    val cleanSeqOp = sc.clean(seqOp)
+    val cleanCombOp = sc.clean(combOp)
+    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
+    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
+    sc.runJob(this, aggregatePartition, mergeResult)
+    jobResult
+  }
+
+  /**
+   * Return the number of elements in the RDD.
+   */
+  def count(): Long = {
+    sc.runJob(this, (iter: Iterator[T]) => {
+      var result = 0L
+      while (iter.hasNext) {
+        result += 1L
+        iter.next()
+      }
+      result
+    }).sum
+  }
+
+  /**
+   * (Experimental) Approximate version of count() that returns a potentially incomplete result
+   * within a timeout, even if not all tasks have finished.
+   */
+  def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
+    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
+      var result = 0L
+      while (iter.hasNext) {
+        result += 1L
+        iter.next()
+      }
+      result
+    }
+    val evaluator = new CountEvaluator(partitions.size, confidence)
+    sc.runApproximateJob(this, countElements, evaluator, timeout)
+  }
+
+  /**
+   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
+   * combine step happens locally on the master, equivalent to running a single reduce task.
+   */
+  def countByValue(): Map[T, Long] = {
+    if (elementClassManifest.erasure.isArray) {
+      throw new SparkException("countByValue() does not support arrays")
+    }
+    // TODO: This should perhaps be distributed by default.
+    def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
+      val map = new OLMap[T]
+      while (iter.hasNext) {
+        val v = iter.next()
+        map.put(v, map.getLong(v) + 1L)
+      }
+      Iterator(map)
+    }
+    def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
+      val iter = m2.object2LongEntrySet.fastIterator()
+      while (iter.hasNext) {
+        val entry = iter.next()
+        m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
+      }
+      return m1
+    }
+    val myResult = mapPartitions(countPartition).reduce(mergeMaps)
+    myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a Scala mutable Map
+  }
+
+  /**
+   * (Experimental) Approximate version of countByValue().
+   */
+  def countByValueApprox(
+      timeout: Long,
+      confidence: Double = 0.95
+      ): PartialResult[Map[T, BoundedDouble]] = {
+    if (elementClassManifest.erasure.isArray) {
+      throw new SparkException("countByValueApprox() does not support arrays")
+    }
+    val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
+      val map = new OLMap[T]
+      while (iter.hasNext) {
+        val v = iter.next()
+        map.put(v, map.getLong(v) + 1L)
+      }
+      map
+    }
+    val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
+    sc.runApproximateJob(this, countPartition, evaluator, timeout)
+  }
+
+  /**
+   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
+   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
+   * whole RDD instead.
+   */
+  def take(num: Int): Array[T] = {
+    if (num == 0) {
+      return new Array[T](0)
+    }
+    val buf = new ArrayBuffer[T]
+    var p = 0
+    while (buf.size < num && p < partitions.size) {
+      val left = num - buf.size
+      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
+      buf ++= res(0)
+      if (buf.size == num)
+        return buf.toArray
+      p += 1
+    }
+    return buf.toArray
+  }
+
+  /**
+   * Return the first element in this RDD.
+   */
+  def first(): T = take(1) match {
+    case Array(t) => t
+    case _ => throw new UnsupportedOperationException("empty collection")
+  }
+
+  /**
+   * Returns the top K elements from this RDD as defined by
+   * the specified implicit Ordering[T].
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
+    mapPartitions { items =>
+      val queue = new BoundedPriorityQueue[T](num)
+      queue ++= items
+      Iterator.single(queue)
+    }.reduce { (queue1, queue2) =>
+      queue1 ++= queue2
+      queue1
+    }.toArray.sorted(ord.reverse)
+  }
+
+  /**
+   * Returns the first K elements from this RDD as defined by
+   * the specified implicit Ordering[T] and maintains the
+   * ordering.
+   * @param num the number of top elements to return
+   * @param ord the implicit ordering for T
+   * @return an array of top elements
+   */
+  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
+
+  /**
+   * Save this RDD as a text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String) {
+    this.map(x => (NullWritable.get(), new Text(x.toString)))
+      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
+  }
+
+  /**
+   * Save this RDD as a compressed text file, using string representations of elements.
+   */
+  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
+    this.map(x => (NullWritable.get(), new Text(x.toString)))
+      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
+  }
+
+  /**
+   * Save this RDD as a SequenceFile of serialized objects.
+   */
+  def saveAsObjectFile(path: String) {
+    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
+      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
+      .saveAsSequenceFile(path)
+  }
+
+  /**
+   * Creates tuples of the elements in this RDD by applying `f`.
+   */
+  def keyBy[K](f: T => K): RDD[(K, T)] = {
+    map(x => (f(x), x))
+  }
+
+  /** A private method for tests, to look at the contents of each partition */
+  private[spark] def collectPartitions(): Array[Array[T]] = {
+    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
+  }
+
+  /**
+   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
+   * directory set with SparkContext.setCheckpointDir() and all references to its parent
+   * RDDs will be removed. This function must be called before any job has been
+   * executed on this RDD. It is strongly recommended that this RDD is persisted in
+   * memory, otherwise saving it on a file will require recomputation.
+   */
+  def checkpoint() {
+    if (context.checkpointDir.isEmpty) {
+      throw new Exception("Checkpoint directory has not been set in the SparkContext")
+    } else if (checkpointData.isEmpty) {
+      checkpointData = Some(new RDDCheckpointData(this))
+      checkpointData.get.markForCheckpoint()
+    }
+  }
+
+  /**
+   * Return whether this RDD has been checkpointed or not
+   */
+  def isCheckpointed: Boolean = {
+    checkpointData.map(_.isCheckpointed).getOrElse(false)
+  }
+
+  /**
+   * Gets the name of the file to which this RDD was checkpointed
+   */
+  def getCheckpointFile: Option[String] = {
+    checkpointData.flatMap(_.getCheckpointFile)
+  }
+
+  // =======================================================================
+  // Other internal methods and fields
+  // =======================================================================
+
+  private var storageLevel: StorageLevel = StorageLevel.NONE
+
+  /** Record user function generating this RDD. */
+  private[spark] val origin = Utils.formatSparkCallSite
+
+  private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
+
+  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
+
+  /** Returns the first parent RDD */
+  protected[spark] def firstParent[U: ClassManifest] = {
+    dependencies.head.rdd.asInstanceOf[RDD[U]]
+  }
+
+  /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
+  def context = sc
+
+  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
+  private var doCheckpointCalled = false
+
+  /**
+   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
+   * after a job using this RDD has completed (therefore the RDD has been materialized and
+   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
+   */
+  private[spark] def doCheckpoint() {
+    if (!doCheckpointCalled) {
+      doCheckpointCalled = true
+      if (checkpointData.isDefined) {
+        checkpointData.get.doCheckpoint()
+      } else {
+        dependencies.foreach(_.rdd.doCheckpoint())
+      }
+    }
+  }
+
+  /**
+   * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
+   * created from the checkpoint file, and forget its old dependencies and partitions.
+   */
+  private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
+    clearDependencies()
+    partitions_ = null
+    deps = null    // Forget the constructor argument for dependencies too
+  }
+
+  /**
+   * Clears the dependencies of this RDD. This method must ensure that all references
+   * to the original parent RDDs is removed to enable the parent RDDs to be garbage
+   * collected. Subclasses of RDD may override this method for implementing their own cleaning
+   * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
+   */
+  protected def clearDependencies() {
+    dependencies_ = null
+  }
+
+  /** A description of this RDD and its recursive dependencies for debugging. */
+  def toDebugString: String = {
+    def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
+      Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
+        rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + "  "))
+    }
+    debugString(this).mkString("\n")
+  }
+
+  override def toString: String = "%s%s[%d] at %s".format(
+    Option(name).map(_ + " ").getOrElse(""),
+    getClass.getSimpleName,
+    id,
+    origin)
+
+  def toJavaRDD() : JavaRDD[T] = {
+    new JavaRDD(this)(elementClassManifest)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
new file mode 100644
index 0000000..6009a41
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.{Partition, SparkException, Logging}
+import org.apache.spark.scheduler.{ResultTask, ShuffleMapTask}
+
+/**
+ * Enumeration to manage state transitions of an RDD through checkpointing
+ * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
+ */
+private[spark] object CheckpointState extends Enumeration {
+  type CheckpointState = Value
+  val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
+}
+
+/**
+ * This class contains all the information related to RDD checkpointing. Each instance of this class
+ * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
+ * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
+ * of the checkpointed RDD.
+ */
+private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
+  extends Logging with Serializable {
+
+  import CheckpointState._
+
+  // The checkpoint state of the associated RDD.
+  var cpState = Initialized
+
+  // The file to which the associated RDD has been checkpointed to
+  @transient var cpFile: Option[String] = None
+
+  // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
+  var cpRDD: Option[RDD[T]] = None
+
+  // Mark the RDD for checkpointing
+  def markForCheckpoint() {
+    RDDCheckpointData.synchronized {
+      if (cpState == Initialized) cpState = MarkedForCheckpoint
+    }
+  }
+
+  // Is the RDD already checkpointed
+  def isCheckpointed: Boolean = {
+    RDDCheckpointData.synchronized { cpState == Checkpointed }
+  }
+
+  // Get the file to which this RDD was checkpointed to as an Option
+  def getCheckpointFile: Option[String] = {
+    RDDCheckpointData.synchronized { cpFile }
+  }
+
+  // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
+  def doCheckpoint() {
+    // If it is marked for checkpointing AND checkpointing is not already in progress,
+    // then set it to be in progress, else return
+    RDDCheckpointData.synchronized {
+      if (cpState == MarkedForCheckpoint) {
+        cpState = CheckpointingInProgress
+      } else {
+        return
+      }
+    }
+
+    // Create the output path for the checkpoint
+    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
+    val fs = path.getFileSystem(new Configuration())
+    if (!fs.mkdirs(path)) {
+      throw new SparkException("Failed to create checkpoint path " + path)
+    }
+
+    // Save to file, and reload it as an RDD
+    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
+
+    // Change the dependencies and partitions of the RDD
+    RDDCheckpointData.synchronized {
+      cpFile = Some(path.toString)
+      cpRDD = Some(newRDD)
+      rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
+      cpState = Checkpointed
+      RDDCheckpointData.clearTaskCaches()
+      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
+    }
+  }
+
+  // Get preferred location of a split after checkpointing
+  def getPreferredLocations(split: Partition): Seq[String] = {
+    RDDCheckpointData.synchronized {
+      cpRDD.get.preferredLocations(split)
+    }
+  }
+
+  def getPartitions: Array[Partition] = {
+    RDDCheckpointData.synchronized {
+      cpRDD.get.partitions
+    }
+  }
+
+  def checkpointRDD: Option[RDD[T]] = {
+    RDDCheckpointData.synchronized {
+      cpRDD
+    }
+  }
+}
+
+private[spark] object RDDCheckpointData {
+  def clearTaskCaches() {
+    ShuffleMapTask.clearCache()
+    ResultTask.clearCache()
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
index 1e8d89e..2c5253a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SampledRDD.scala
@@ -22,7 +22,7 @@ import java.util.Random
 import cern.jet.random.Poisson
 import cern.jet.random.engine.DRand
 
-import org.apache.spark.{RDD, Partition, TaskContext}
+import org.apache.spark.{Partition, TaskContext}
 
 private[spark]
 class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition with Serializable {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
new file mode 100644
index 0000000..5fe4676
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapred.SequenceFileOutputFormat
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.Writable
+
+import org.apache.spark.SparkContext._
+import org.apache.spark.Logging
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
+ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
+ * we need more implicit parameters to convert our keys and values to Writable.
+ *
+ * Import `org.apache.spark.SparkContext._` at the top of their program to use these functions.
+ */
+class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
+    self: RDD[(K, V)])
+  extends Logging
+  with Serializable {
+
+  private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+    val c = {
+      if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
+        classManifest[T].erasure
+      } else {
+        // We get the type of the Writable class by looking at the apply method which converts
+        // from T to Writable. Since we have two apply methods we filter out the one which
+        // is not of the form "java.lang.Object apply(java.lang.Object)"
+        implicitly[T => Writable].getClass.getDeclaredMethods().filter(
+            m => m.getReturnType().toString != "class java.lang.Object" &&
+                 m.getName() == "apply")(0).getReturnType
+
+      }
+       // TODO: use something like WritableConverter to avoid reflection
+    }
+    c.asInstanceOf[Class[_ <: Writable]]
+  }
+
+  /**
+   * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
+   * and value types. If the key or value are Writable, then we use their classes directly;
+   * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
+   * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
+   * file system.
+   */
+  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
+    def anyToWritable[U <% Writable](u: U): Writable = u
+
+    val keyClass = getWritableClass[K]
+    val valueClass = getWritableClass[V]
+    val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
+    val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
+
+    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
+    val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
+    val jobConf = new JobConf(self.context.hadoopConfiguration)
+    if (!convertKey && !convertValue) {
+      self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
+    } else if (!convertKey && convertValue) {
+      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
+    } else if (convertKey && !convertValue) {
+      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
+    } else if (convertKey && convertValue) {
+      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
+        path, keyClass, valueClass, format, jobConf, codec)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
index f0e9ab8..9537152 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
+import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext}
 
 
 private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index 7369dfa..8c1a29d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -20,7 +20,6 @@ package org.apache.spark.rdd
 import java.util.{HashMap => JHashMap}
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.RDD
 import org.apache.spark.Partitioner
 import org.apache.spark.Dependency
 import org.apache.spark.TaskContext

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index fd02476..ae8a9f3 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.rdd
 
 import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{Dependency, RangeDependency, RDD, SparkContext, Partition, TaskContext}
+import org.apache.spark.{Dependency, RangeDependency, SparkContext, Partition, TaskContext}
 import java.io.{ObjectOutputStream, IOException}
 
 private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
index 5ae1db3..31e6fd5 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
 import java.io.{ObjectOutputStream, IOException}
 
 private[spark] class ZippedPartitionsPartition(

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
index 3bd00d2..567b67d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ZippedRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.rdd
 
-import org.apache.spark.{Utils, OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
+import org.apache.spark.{OneToOneDependency, SparkContext, Partition, TaskContext}
 import java.io.{ObjectOutputStream, IOException}
 
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 5ac700b..92add5b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
 
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.partial.{ApproximateActionListener, ApproximateEvaluator, PartialResult}
 import org.apache.spark.scheduler.cluster.TaskInfo

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
index 5b07933..0d99670 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala
@@ -23,6 +23,7 @@ import org.apache.spark.scheduler.cluster.TaskInfo
 import scala.collection.mutable.Map
 
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 98ef4d1..c8b78bf 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -28,6 +28,7 @@ import scala.collection.mutable.{Map, HashMap, ListBuffer}
 import scala.io.Source
 
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.cluster.TaskInfo
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
index 2f157cc..2b007cb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala
@@ -17,11 +17,14 @@
 
 package org.apache.spark.scheduler
 
-import org.apache.spark._
 import java.io._
-import util.{MetadataCleaner, TimeStampedHashMap}
 import java.util.zip.{GZIPInputStream, GZIPOutputStream}
 
+import org.apache.spark._
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.RDDCheckpointData
+import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
+
 private[spark] object ResultTask {
 
   // A simple map between the stage id to the serialized byte array of a task.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index ca716b4..764775f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -26,6 +26,8 @@ import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.storage._
 import org.apache.spark.util.{TimeStampedHashMap, MetadataCleaner}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.rdd.RDDCheckpointData
 
 
 private[spark] object ShuffleMapTask {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 3504424..c3cf4b8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -19,8 +19,8 @@ package org.apache.spark.scheduler
 
 import java.util.Properties
 import org.apache.spark.scheduler.cluster.TaskInfo
-import org.apache.spark.util.Distribution
-import org.apache.spark.{Logging, SparkContext, TaskEndReason, Utils}
+import org.apache.spark.util.{Utils, Distribution}
+import org.apache.spark.{Logging, SparkContext, TaskEndReason}
 import org.apache.spark.executor.TaskMetrics
 
 sealed trait SparkListenerEvents

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
index 87b1fe4..aa293dc 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala
@@ -17,9 +17,8 @@
 
 package org.apache.spark.scheduler
 
-import java.net.URI
-
 import org.apache.spark._
+import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.BlockManagerId
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
index 776675d..5c7e5bb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResult.scala
@@ -21,8 +21,9 @@ import java.io._
 
 import scala.collection.mutable.Map
 import org.apache.spark.executor.TaskMetrics
-import org.apache.spark.{Utils, SparkEnv}
+import org.apache.spark.{SparkEnv}
 import java.nio.ByteBuffer
+import org.apache.spark.util.Utils
 
 // Task result. Also contains updates to accumulator variables.
 // TODO: Use of distributed cache to return result is a hack to get around

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index a33307b..1b31c8c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.HashSet
 import scala.math.max
 import scala.math.min
 
-import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState, Utils}
+import org.apache.spark.{FetchFailed, Logging, Resubmitted, SparkEnv, Success, TaskEndReason, TaskState}
 import org.apache.spark.{ExceptionFailure, SparkException, TaskResultTooBigFailure}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.scheduler._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
index bde2f73..d57eb32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackend.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.{SparkContext, Utils}
+import org.apache.spark.{SparkContext}
 
 /**
  * A backend interface for cluster scheduling systems that allows plugging in different ones under

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index ac6dc7d..d003bf1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -17,10 +17,11 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.{Utils, Logging, SparkContext}
+import org.apache.spark.{Logging, SparkContext}
 import org.apache.spark.deploy.client.{Client, ClientListener}
 import org.apache.spark.deploy.{Command, ApplicationDescription}
 import scala.collection.mutable.HashMap
+import org.apache.spark.util.Utils
 
 private[spark] class SparkDeploySchedulerBackend(
     scheduler: ClusterScheduler,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
index 1cc5daf..9c36d22 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneClusterMessage.scala
@@ -20,8 +20,7 @@ package org.apache.spark.scheduler.cluster
 import java.nio.ByteBuffer
 
 import org.apache.spark.TaskState.TaskState
-import org.apache.spark.Utils
-import org.apache.spark.util.SerializableBuffer
+import org.apache.spark.util.{Utils, SerializableBuffer}
 
 
 private[spark] sealed trait StandaloneClusterMessage extends Serializable

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 3677a82..b4ea0be 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -28,8 +28,9 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient
 import akka.util.Duration
 import akka.util.duration._
 
-import org.apache.spark.{Utils, SparkException, Logging, TaskState}
+import org.apache.spark.{SparkException, Logging, TaskState}
 import org.apache.spark.scheduler.cluster.StandaloneClusterMessages._
+import org.apache.spark.util.Utils
 
 /**
  * A standalone scheduler backend, which waits for standalone executors to connect to it through

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
index 7ce14be..9685fb1 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/TaskInfo.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.scheduler.cluster
 
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
 
 /**
  * Information about a running task attempt inside a TaskSet.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
index f0ebe66..e8fa5e2 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalScheduler.scala
@@ -34,6 +34,7 @@ import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster._
 import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
 import akka.actor._
+import org.apache.spark.util.Utils
 
 /**
  * A FIFO or Fair TaskScheduler implementation that runs tasks locally in a thread pool. Optionally

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index f6a2fea..3dbe61d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
-import org.apache.spark.{SparkException, Utils, Logging, SparkContext}
+import org.apache.spark.{SparkException, Logging, SparkContext}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.collection.JavaConversions._
 import java.io.File

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
index e002af1..541f86e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -23,7 +23,7 @@ import org.apache.mesos.{Scheduler => MScheduler}
 import org.apache.mesos._
 import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, TaskState => MesosTaskState, _}
 
-import org.apache.spark.{SparkException, Utils, Logging, SparkContext}
+import org.apache.spark.{SparkException, Logging, SparkContext}
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
 import scala.collection.JavaConversions._
 import java.io.File
@@ -31,6 +31,7 @@ import org.apache.spark.scheduler.cluster._
 import java.util.{ArrayList => JArrayList, List => JList}
 import java.util.Collections
 import org.apache.spark.TaskState
+import org.apache.spark.util.Utils
 
 /**
  * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
new file mode 100644
index 0000000..4de8161
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/serializer/JavaSerializer.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.io._
+import java.nio.ByteBuffer
+
+import org.apache.spark.util.ByteBufferInputStream
+
+private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
+  val objOut = new ObjectOutputStream(out)
+  def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
+  def flush() { objOut.flush() }
+  def close() { objOut.close() }
+}
+
+private[spark] class JavaDeserializationStream(in: InputStream, loader: ClassLoader)
+extends DeserializationStream {
+  val objIn = new ObjectInputStream(in) {
+    override def resolveClass(desc: ObjectStreamClass) =
+      Class.forName(desc.getName, false, loader)
+  }
+
+  def readObject[T](): T = objIn.readObject().asInstanceOf[T]
+  def close() { objIn.close() }
+}
+
+private[spark] class JavaSerializerInstance extends SerializerInstance {
+  def serialize[T](t: T): ByteBuffer = {
+    val bos = new ByteArrayOutputStream()
+    val out = serializeStream(bos)
+    out.writeObject(t)
+    out.close()
+    ByteBuffer.wrap(bos.toByteArray)
+  }
+
+  def deserialize[T](bytes: ByteBuffer): T = {
+    val bis = new ByteBufferInputStream(bytes)
+    val in = deserializeStream(bis)
+    in.readObject().asInstanceOf[T]
+  }
+
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+    val bis = new ByteBufferInputStream(bytes)
+    val in = deserializeStream(bis, loader)
+    in.readObject().asInstanceOf[T]
+  }
+
+  def serializeStream(s: OutputStream): SerializationStream = {
+    new JavaSerializationStream(s)
+  }
+
+  def deserializeStream(s: InputStream): DeserializationStream = {
+    new JavaDeserializationStream(s, Thread.currentThread.getContextClassLoader)
+  }
+
+  def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
+    new JavaDeserializationStream(s, loader)
+  }
+}
+
+/**
+ * A Spark serializer that uses Java's built-in serialization.
+ */
+class JavaSerializer extends Serializer {
+  def newInstance(): SerializerInstance = new JavaSerializerInstance
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
new file mode 100644
index 0000000..24ef204
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.serializer
+
+import java.nio.ByteBuffer
+import java.io.{EOFException, InputStream, OutputStream}
+
+import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer}
+import com.esotericsoftware.kryo.{KryoException, Kryo}
+import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
+import com.twitter.chill.ScalaKryoInstantiator
+
+import org.apache.spark.{SerializableWritable, Logging}
+import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock, StorageLevel}
+
+import org.apache.spark.broadcast.HttpBroadcast
+
+/**
+ * A Spark serializer that uses the [[http://code.google.com/p/kryo/wiki/V1Documentation Kryo 1.x library]].
+ */
+class KryoSerializer extends org.apache.spark.serializer.Serializer with Logging {
+  private val bufferSize = System.getProperty("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024
+
+  def newKryoOutput() = new KryoOutput(bufferSize)
+
+  def newKryoInput() = new KryoInput(bufferSize)
+
+  def newKryo(): Kryo = {
+    val instantiator = new ScalaKryoInstantiator
+    val kryo = instantiator.newKryo()
+    val classLoader = Thread.currentThread.getContextClassLoader
+
+    // Register some commonly used classes
+    val toRegister: Seq[AnyRef] = Seq(
+      ByteBuffer.allocate(1),
+      StorageLevel.MEMORY_ONLY,
+      PutBlock("1", ByteBuffer.allocate(1), StorageLevel.MEMORY_ONLY),
+      GotBlock("1", ByteBuffer.allocate(1)),
+      GetBlock("1")
+    )
+
+    for (obj <- toRegister) kryo.register(obj.getClass)
+
+    // Allow sending SerializableWritable
+    kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
+    kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
+
+    // Allow the user to register their own classes by setting spark.kryo.registrator
+    try {
+      Option(System.getProperty("spark.kryo.registrator")).foreach { regCls =>
+        logDebug("Running user registrator: " + regCls)
+        val reg = Class.forName(regCls, true, classLoader).newInstance().asInstanceOf[KryoRegistrator]
+        reg.registerClasses(kryo)
+      }
+    } catch {
+      case _: Exception => println("Failed to register spark.kryo.registrator")
+    }
+
+    kryo.setClassLoader(classLoader)
+
+    // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops
+    kryo.setReferences(System.getProperty("spark.kryo.referenceTracking", "true").toBoolean)
+
+    kryo
+  }
+
+  def newInstance(): SerializerInstance = {
+    new KryoSerializerInstance(this)
+  }
+}
+
+private[spark]
+class KryoSerializationStream(kryo: Kryo, outStream: OutputStream) extends SerializationStream {
+  val output = new KryoOutput(outStream)
+
+  def writeObject[T](t: T): SerializationStream = {
+    kryo.writeClassAndObject(output, t)
+    this
+  }
+
+  def flush() { output.flush() }
+  def close() { output.close() }
+}
+
+private[spark]
+class KryoDeserializationStream(kryo: Kryo, inStream: InputStream) extends DeserializationStream {
+  val input = new KryoInput(inStream)
+
+  def readObject[T](): T = {
+    try {
+      kryo.readClassAndObject(input).asInstanceOf[T]
+    } catch {
+      // DeserializationStream uses the EOF exception to indicate stopping condition.
+      case _: KryoException => throw new EOFException
+    }
+  }
+
+  def close() {
+    // Kryo's Input automatically closes the input stream it is using.
+    input.close()
+  }
+}
+
+private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends SerializerInstance {
+  val kryo = ks.newKryo()
+  val output = ks.newKryoOutput()
+  val input = ks.newKryoInput()
+
+  def serialize[T](t: T): ByteBuffer = {
+    output.clear()
+    kryo.writeClassAndObject(output, t)
+    ByteBuffer.wrap(output.toBytes)
+  }
+
+  def deserialize[T](bytes: ByteBuffer): T = {
+    input.setBuffer(bytes.array)
+    kryo.readClassAndObject(input).asInstanceOf[T]
+  }
+
+  def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = {
+    val oldClassLoader = kryo.getClassLoader
+    kryo.setClassLoader(loader)
+    input.setBuffer(bytes.array)
+    val obj = kryo.readClassAndObject(input).asInstanceOf[T]
+    kryo.setClassLoader(oldClassLoader)
+    obj
+  }
+
+  def serializeStream(s: OutputStream): SerializationStream = {
+    new KryoSerializationStream(kryo, s)
+  }
+
+  def deserializeStream(s: InputStream): DeserializationStream = {
+    new KryoDeserializationStream(kryo, s)
+  }
+}
+
+/**
+ * Interface implemented by clients to register their classes with Kryo when using Kryo
+ * serialization.
+ */
+trait KryoRegistrator {
+  def registerClasses(kryo: Kryo)
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
index c91f0fc..3aeda38 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala
@@ -27,12 +27,12 @@ import scala.collection.mutable.Queue
 import io.netty.buffer.ByteBuf
 
 import org.apache.spark.Logging
-import org.apache.spark.Utils
 import org.apache.spark.SparkException
 import org.apache.spark.network.BufferMessage
 import org.apache.spark.network.ConnectionManagerId
 import org.apache.spark.network.netty.ShuffleCopier
 import org.apache.spark.serializer.Serializer
+import org.apache.spark.util.Utils
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 3299ac9..60fdc5f 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -29,11 +29,11 @@ import akka.util.duration._
 
 import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
 
-import org.apache.spark.{Logging, SparkEnv, SparkException, Utils}
+import org.apache.spark.{Logging, SparkEnv, SparkException}
 import org.apache.spark.io.CompressionCodec
 import org.apache.spark.network._
 import org.apache.spark.serializer.Serializer
-import org.apache.spark.util.{ByteBufferInputStream, IdGenerator, MetadataCleaner, TimeStampedHashMap}
+import org.apache.spark.util._
 
 import sun.nio.ch.DirectBuffer
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
index a22a80d..74207f5 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerId.scala
@@ -19,7 +19,7 @@ package org.apache.spark.storage
 
 import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
 import java.util.concurrent.ConcurrentHashMap
-import org.apache.spark.Utils
+import org.apache.spark.util.Utils
 
 /**
  * This class represent an unique identifier for a BlockManager.

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index baa4a1d..c7b23ab 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -28,8 +28,9 @@ import akka.pattern.ask
 import akka.util.Duration
 import akka.util.duration._
 
-import org.apache.spark.{Logging, Utils, SparkException}
+import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.storage.BlockManagerMessages._
+import org.apache.spark.util.Utils
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
index f485602..678c382 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerWorker.scala
@@ -19,8 +19,9 @@ package org.apache.spark.storage
 
 import java.nio.ByteBuffer
 
-import org.apache.spark.{Logging, Utils}
+import org.apache.spark.{Logging}
 import org.apache.spark.network._
+import org.apache.spark.util.Utils
 
 /**
  * A network interface for BlockManager. Each slave should have one

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
index fd945e0..fc25ef0 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala
@@ -28,12 +28,12 @@ import scala.collection.mutable.ArrayBuffer
 
 import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
 
-import org.apache.spark.Utils
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.serializer.{Serializer, SerializationStream}
 import org.apache.spark.Logging
 import org.apache.spark.network.netty.ShuffleSender
 import org.apache.spark.network.netty.PathResolver
+import org.apache.spark.util.Utils
 
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 828dc0f..3b3b234 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -19,9 +19,9 @@ package org.apache.spark.storage
 
 import java.util.LinkedHashMap
 import java.util.concurrent.ArrayBlockingQueue
-import org.apache.spark.{SizeEstimator, Utils}
 import java.nio.ByteBuffer
 import collection.mutable.ArrayBuffer
+import org.apache.spark.util.{SizeEstimator, Utils}
 
 /**
  * Stores blocks in memory, either as ArrayBuffers of deserialized Java objects or as

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 0bba1da..2bb7715 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -17,8 +17,9 @@
 
 package org.apache.spark.storage
 
-import org.apache.spark.{Utils, SparkContext}
+import org.apache.spark.{SparkContext}
 import BlockManagerMasterActor.BlockStatus
+import org.apache.spark.util.Utils
 
 private[spark]
 case class StorageStatus(blockManagerId: BlockManagerId, maxMem: Long,

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index 1d5afe9..f2ae8dd 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -19,9 +19,9 @@ package org.apache.spark.storage
 
 import akka.actor._
 
-import org.apache.spark.KryoSerializer
 import java.util.concurrent.ArrayBlockingQueue
 import util.Random
+import org.apache.spark.serializer.KryoSerializer
 
 /**
  * This class tests the BlockManager and MemoryStore for thread safety and

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
index 4688eff..ad456ea 100644
--- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala
@@ -21,12 +21,13 @@ import javax.servlet.http.HttpServletRequest
 
 import org.eclipse.jetty.server.{Handler, Server}
 
-import org.apache.spark.{Logging, SparkContext, SparkEnv, Utils}
+import org.apache.spark.{Logging, SparkContext, SparkEnv}
 import org.apache.spark.ui.env.EnvironmentUI
 import org.apache.spark.ui.exec.ExecutorsUI
 import org.apache.spark.ui.storage.BlockManagerUI
 import org.apache.spark.ui.jobs.JobProgressUI
 import org.apache.spark.ui.JettyUtils._
+import org.apache.spark.util.Utils
 
 /** Top level user interface for Spark */
 private[spark] class SparkUI(sc: SparkContext) extends Logging {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
index efe6b47..6e56c22 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala
@@ -7,13 +7,14 @@ import scala.xml.Node
 
 import org.eclipse.jetty.server.Handler
 
-import org.apache.spark.{ExceptionFailure, Logging, Utils, SparkContext}
+import org.apache.spark.{ExceptionFailure, Logging, SparkContext}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.scheduler.{SparkListenerTaskStart, SparkListenerTaskEnd, SparkListener}
 import org.apache.spark.ui.JettyUtils._
 import org.apache.spark.ui.Page.Executors
 import org.apache.spark.ui.UIUtils
+import org.apache.spark.util.Utils
 
 
 private[spark] class ExecutorsUI(val sc: SparkContext) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
index ae02226..86e0af0 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala
@@ -3,7 +3,7 @@ package org.apache.spark.ui.jobs
 import scala.Seq
 import scala.collection.mutable.{ListBuffer, HashMap, HashSet}
 
-import org.apache.spark.{ExceptionFailure, SparkContext, Success, Utils}
+import org.apache.spark.{ExceptionFailure, SparkContext, Success}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.TaskInfo
 import org.apache.spark.executor.TaskMetrics

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
index 1bb7638..6aecef5 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala
@@ -29,11 +29,12 @@ import scala.Seq
 import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
 
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.{ExceptionFailure, SparkContext, Success, Utils}
+import org.apache.spark.{ExceptionFailure, SparkContext, Success}
 import org.apache.spark.scheduler._
 import collection.mutable
 import org.apache.spark.scheduler.cluster.SchedulingMode
 import org.apache.spark.scheduler.cluster.SchedulingMode.SchedulingMode
+import org.apache.spark.util.Utils
 
 /** Web UI showing progress status of all jobs in the given SparkContext. */
 private[spark] class JobProgressUI(val sc: SparkContext) {