You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:48 UTC

[64/69] [abbrv] Move some classes to more appropriate packages:

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/RDD.scala b/core/src/main/scala/org/apache/spark/RDD.scala
deleted file mode 100644
index 0d1f07f..0000000
--- a/core/src/main/scala/org/apache/spark/RDD.scala
+++ /dev/null
@@ -1,957 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.util.Random
-
-import scala.collection.Map
-import scala.collection.JavaConversions.mapAsScalaMap
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.mapred.TextOutputFormat
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-import org.apache.spark.Partitioner._
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.partial.BoundedDouble
-import org.apache.spark.partial.CountEvaluator
-import org.apache.spark.partial.GroupedCountEvaluator
-import org.apache.spark.partial.PartialResult
-import org.apache.spark.rdd.CoalescedRDD
-import org.apache.spark.rdd.CartesianRDD
-import org.apache.spark.rdd.FilteredRDD
-import org.apache.spark.rdd.FlatMappedRDD
-import org.apache.spark.rdd.GlommedRDD
-import org.apache.spark.rdd.MappedRDD
-import org.apache.spark.rdd.MapPartitionsRDD
-import org.apache.spark.rdd.MapPartitionsWithIndexRDD
-import org.apache.spark.rdd.PipedRDD
-import org.apache.spark.rdd.SampledRDD
-import org.apache.spark.rdd.ShuffledRDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.rdd.ZippedRDD
-import org.apache.spark.rdd.ZippedPartitionsRDD2
-import org.apache.spark.rdd.ZippedPartitionsRDD3
-import org.apache.spark.rdd.ZippedPartitionsRDD4
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.BoundedPriorityQueue
-
-import SparkContext._
-
-/**
- * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
- * partitioned collection of elements that can be operated on in parallel. This class contains the
- * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
- * [[org.apache.spark.PairRDDFunctions]] contains operations available only on RDDs of key-value pairs, such
- * as `groupByKey` and `join`; [[org.apache.spark.DoubleRDDFunctions]] contains operations available only on
- * RDDs of Doubles; and [[org.apache.spark.SequenceFileRDDFunctions]] contains operations available on RDDs
- * that can be saved as SequenceFiles. These operations are automatically available on any RDD of
- * the right type (e.g. RDD[(Int, Int)] through implicit conversions when you
- * `import org.apache.spark.SparkContext._`.
- *
- * Internally, each RDD is characterized by five main properties:
- *
- *  - A list of partitions
- *  - A function for computing each split
- *  - A list of dependencies on other RDDs
- *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
- *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
- *    an HDFS file)
- *
- * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
- * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
- * reading data from a new storage system) by overriding these functions. Please refer to the
- * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
- * on RDD internals.
- */
-abstract class RDD[T: ClassManifest](
-    @transient private var sc: SparkContext,
-    @transient private var deps: Seq[Dependency[_]]
-  ) extends Serializable with Logging {
-
-  /** Construct an RDD with just a one-to-one dependency on one parent */
-  def this(@transient oneParent: RDD[_]) =
-    this(oneParent.context , List(new OneToOneDependency(oneParent)))
-
-  // =======================================================================
-  // Methods that should be implemented by subclasses of RDD
-  // =======================================================================
-
-  /** Implemented by subclasses to compute a given partition. */
-  def compute(split: Partition, context: TaskContext): Iterator[T]
-
-  /**
-   * Implemented by subclasses to return the set of partitions in this RDD. This method will only
-   * be called once, so it is safe to implement a time-consuming computation in it.
-   */
-  protected def getPartitions: Array[Partition]
-
-  /**
-   * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
-   * be called once, so it is safe to implement a time-consuming computation in it.
-   */
-  protected def getDependencies: Seq[Dependency[_]] = deps
-
-  /** Optionally overridden by subclasses to specify placement preferences. */
-  protected def getPreferredLocations(split: Partition): Seq[String] = Nil
-
-  /** Optionally overridden by subclasses to specify how they are partitioned. */
-  val partitioner: Option[Partitioner] = None
-
-  // =======================================================================
-  // Methods and fields available on all RDDs
-  // =======================================================================
-
-  /** The SparkContext that created this RDD. */
-  def sparkContext: SparkContext = sc
-
-  /** A unique ID for this RDD (within its SparkContext). */
-  val id: Int = sc.newRddId()
-
-  /** A friendly name for this RDD */
-  var name: String = null
-
-  /** Assign a name to this RDD */
-  def setName(_name: String) = {
-    name = _name
-    this
-  }
-
-  /** User-defined generator of this RDD*/
-  var generator = Utils.getCallSiteInfo.firstUserClass
-
-  /** Reset generator*/
-  def setGenerator(_generator: String) = {
-    generator = _generator
-  }
-
-  /**
-   * Set this RDD's storage level to persist its values across operations after the first time
-   * it is computed. This can only be used to assign a new storage level if the RDD does not
-   * have a storage level set yet..
-   */
-  def persist(newLevel: StorageLevel): RDD[T] = {
-    // TODO: Handle changes of StorageLevel
-    if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
-      throw new UnsupportedOperationException(
-        "Cannot change storage level of an RDD after it was already assigned a level")
-    }
-    storageLevel = newLevel
-    // Register the RDD with the SparkContext
-    sc.persistentRdds(id) = this
-    this
-  }
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY)
-
-  /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
-  def cache(): RDD[T] = persist()
-
-  /**
-   * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk.
-   *
-   * @param blocking Whether to block until all blocks are deleted.
-   * @return This RDD.
-   */
-  def unpersist(blocking: Boolean = true): RDD[T] = {
-    logInfo("Removing RDD " + id + " from persistence list")
-    sc.env.blockManager.master.removeRdd(id, blocking)
-    sc.persistentRdds.remove(id)
-    storageLevel = StorageLevel.NONE
-    this
-  }
-
-  /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */
-  def getStorageLevel = storageLevel
-
-  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
-  // be overwritten when we're checkpointed
-  private var dependencies_ : Seq[Dependency[_]] = null
-  @transient private var partitions_ : Array[Partition] = null
-
-  /** An Option holding our checkpoint RDD, if we are checkpointed */
-  private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD)
-
-  /**
-   * Get the list of dependencies of this RDD, taking into account whether the
-   * RDD is checkpointed or not.
-   */
-  final def dependencies: Seq[Dependency[_]] = {
-    checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
-      if (dependencies_ == null) {
-        dependencies_ = getDependencies
-      }
-      dependencies_
-    }
-  }
-
-  /**
-   * Get the array of partitions of this RDD, taking into account whether the
-   * RDD is checkpointed or not.
-   */
-  final def partitions: Array[Partition] = {
-    checkpointRDD.map(_.partitions).getOrElse {
-      if (partitions_ == null) {
-        partitions_ = getPartitions
-      }
-      partitions_
-    }
-  }
-
-  /**
-   * Get the preferred locations of a partition (as hostnames), taking into account whether the
-   * RDD is checkpointed.
-   */
-  final def preferredLocations(split: Partition): Seq[String] = {
-    checkpointRDD.map(_.getPreferredLocations(split)).getOrElse {
-      getPreferredLocations(split)
-    }
-  }
-
-  /**
-   * Internal method to this RDD; will read from cache if applicable, or otherwise compute it.
-   * This should ''not'' be called by users directly, but is available for implementors of custom
-   * subclasses of RDD.
-   */
-  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
-    if (storageLevel != StorageLevel.NONE) {
-      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
-    } else {
-      computeOrReadCheckpoint(split, context)
-    }
-  }
-
-  /**
-   * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing.
-   */
-  private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = {
-    if (isCheckpointed) {
-      firstParent[T].iterator(split, context)
-    } else {
-      compute(split, context)
-    }
-  }
-
-  // Transformations (return a new RDD)
-
-  /**
-   * Return a new RDD by applying a function to all elements of this RDD.
-   */
-  def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
-
-  /**
-   *  Return a new RDD by first applying a function to all elements of this
-   *  RDD, and then flattening the results.
-   */
-  def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
-    new FlatMappedRDD(this, sc.clean(f))
-
-  /**
-   * Return a new RDD containing only the elements that satisfy a predicate.
-   */
-  def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f))
-
-  /**
-   * Return a new RDD containing the distinct elements in this RDD.
-   */
-  def distinct(numPartitions: Int): RDD[T] =
-    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
-
-  def distinct(): RDD[T] = distinct(partitions.size)
-
-  /**
-   * Return a new RDD that is reduced into `numPartitions` partitions.
-   */
-  def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
-    if (shuffle) {
-      // include a shuffle step so that our upstream tasks are still distributed
-      new CoalescedRDD(
-        new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
-        new HashPartitioner(numPartitions)),
-        numPartitions).keys
-    } else {
-      new CoalescedRDD(this, numPartitions)
-    }
-  }
-
-  /**
-   * Return a sampled subset of this RDD.
-   */
-  def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
-    new SampledRDD(this, withReplacement, fraction, seed)
-
-  def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
-    var fraction = 0.0
-    var total = 0
-    val multiplier = 3.0
-    val initialCount = this.count()
-    var maxSelected = 0
-
-    if (num < 0) {
-      throw new IllegalArgumentException("Negative number of elements requested")
-    }
-
-    if (initialCount > Integer.MAX_VALUE - 1) {
-      maxSelected = Integer.MAX_VALUE - 1
-    } else {
-      maxSelected = initialCount.toInt
-    }
-
-    if (num > initialCount && !withReplacement) {
-      total = maxSelected
-      fraction = multiplier * (maxSelected + 1) / initialCount
-    } else {
-      fraction = multiplier * (num + 1) / initialCount
-      total = num
-    }
-
-    val rand = new Random(seed)
-    var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
-
-    // If the first sample didn't turn out large enough, keep trying to take samples;
-    // this shouldn't happen often because we use a big multiplier for thei initial size
-    while (samples.length < total) {
-      samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
-    }
-
-    Utils.randomizeInPlace(samples, rand).take(total)
-  }
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other))
-
-  /**
-   * Return the union of this RDD and another one. Any identical elements will appear multiple
-   * times (use `.distinct()` to eliminate them).
-   */
-  def ++(other: RDD[T]): RDD[T] = this.union(other)
-
-  /**
-   * Return an RDD created by coalescing all elements within each partition into an array.
-   */
-  def glom(): RDD[Array[T]] = new GlommedRDD(this)
-
-  /**
-   * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
-   * elements (a, b) where a is in `this` and b is in `other`.
-   */
-  def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
-
-  /**
-   * Return an RDD of grouped items.
-   */
-  def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
-    groupBy[K](f, defaultPartitioner(this))
-
-  /**
-   * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
-   * mapping to that key.
-   */
-  def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
-    groupBy(f, new HashPartitioner(numPartitions))
-
-  /**
-   * Return an RDD of grouped items.
-   */
-  def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
-    val cleanF = sc.clean(f)
-    this.map(t => (cleanF(t), t)).groupByKey(p)
-  }
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: String): RDD[String] = new PipedRDD(this, command)
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   */
-  def pipe(command: String, env: Map[String, String]): RDD[String] =
-    new PipedRDD(this, command, env)
-
-
-  /**
-   * Return an RDD created by piping elements to a forked external process.
-   * The print behavior can be customized by providing two functions.
-   *
-   * @param command command to run in forked process.
-   * @param env environment variables to set.
-   * @param printPipeContext Before piping elements, this function is called as an oppotunity
-   *                         to pipe context data. Print line function (like out.println) will be
-   *                         passed as printPipeContext's parameter.
-   * @param printRDDElement Use this function to customize how to pipe elements. This function
-   *                        will be called with each RDD element as the 1st parameter, and the
-   *                        print line function (like out.println()) as the 2nd parameter.
-   *                        An example of pipe the RDD data of groupBy() in a streaming way,
-   *                        instead of constructing a huge String to concat all the elements:
-   *                        def printRDDElement(record:(String, Seq[String]), f:String=>Unit) =
-   *                          for (e <- record._2){f(e)}
-   * @return the result RDD
-   */
-  def pipe(
-      command: Seq[String],
-      env: Map[String, String] = Map(),
-      printPipeContext: (String => Unit) => Unit = null,
-      printRDDElement: (T, String => Unit) => Unit = null): RDD[String] =
-    new PipedRDD(this, command, env,
-      if (printPipeContext ne null) sc.clean(printPipeContext) else null,
-      if (printRDDElement ne null) sc.clean(printRDDElement) else null)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD.
-   */
-  def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
-   * of the original partition.
-   */
-  def mapPartitionsWithIndex[U: ClassManifest](
-    f: (Int, Iterator[T]) => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
-   * of the original partition.
-   */
-  @deprecated("use mapPartitionsWithIndex", "0.7.0")
-  def mapPartitionsWithSplit[U: ClassManifest](
-    f: (Int, Iterator[T]) => Iterator[U],
-    preservesPartitioning: Boolean = false): RDD[U] =
-    new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
-
-  /**
-   * Maps f over this RDD, where f takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
-    (f:(T, A) => U): RDD[U] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-        val a = constructA(index)
-        iter.map(t => f(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
-  }
-
-  /**
-   * FlatMaps f over this RDD, where f takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
-    (f:(T, A) => Seq[U]): RDD[U] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
-        val a = constructA(index)
-        iter.flatMap(t => f(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), preservesPartitioning)
-  }
-
-  /**
-   * Applies f to each element of this RDD, where f takes an additional parameter of type A.
-   * This additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def foreachWith[A: ClassManifest](constructA: Int => A)
-    (f:(T, A) => Unit) {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-        val a = constructA(index)
-        iter.map(t => {f(t, a); t})
-      }
-    (new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)).foreach(_ => {})
-  }
-
-  /**
-   * Filters this RDD with p, where p takes an additional parameter of type A.  This
-   * additional parameter is produced by constructA, which is called in each
-   * partition with the index of that partition.
-   */
-  def filterWith[A: ClassManifest](constructA: Int => A)
-    (p:(T, A) => Boolean): RDD[T] = {
-      def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
-        val a = constructA(index)
-        iter.filter(t => p(t, a))
-      }
-    new MapPartitionsWithIndexRDD(this, sc.clean(iterF _), true)
-  }
-
-  /**
-   * Zips this RDD with another one, returning key-value pairs with the first element in each RDD,
-   * second element in each RDD, etc. Assumes that the two RDDs have the *same number of
-   * partitions* and the *same number of elements in each partition* (e.g. one was made through
-   * a map on the other).
-   */
-  def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
-
-  /**
-   * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by
-   * applying a function to the zipped partitions. Assumes that all the RDDs have the
-   * *same number of partitions*, but does *not* require them to have the same number
-   * of elements in each partition.
-   */
-  def zipPartitions[B: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B])
-      (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2)
-
-  def zipPartitions[B: ClassManifest, C: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B], rdd3: RDD[C])
-      (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3)
-
-  def zipPartitions[B: ClassManifest, C: ClassManifest, D: ClassManifest, V: ClassManifest]
-      (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])
-      (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] =
-    new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4)
-
-
-  // Actions (launch a job to return a value to the user program)
-
-  /**
-   * Applies a function f to all elements of this RDD.
-   */
-  def foreach(f: T => Unit) {
-    val cleanF = sc.clean(f)
-    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
-  }
-
-  /**
-   * Applies a function f to each partition of this RDD.
-   */
-  def foreachPartition(f: Iterator[T] => Unit) {
-    val cleanF = sc.clean(f)
-    sc.runJob(this, (iter: Iterator[T]) => cleanF(iter))
-  }
-
-  /**
-   * Return an array that contains all of the elements in this RDD.
-   */
-  def collect(): Array[T] = {
-    val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
-    Array.concat(results: _*)
-  }
-
-  /**
-   * Return an array that contains all of the elements in this RDD.
-   */
-  def toArray(): Array[T] = collect()
-
-  /**
-   * Return an RDD that contains all matching values by applying `f`.
-   */
-  def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
-    filter(f.isDefinedAt).map(f)
-  }
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   *
-   * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
-   * RDD will be <= us.
-   */
-  def subtract(other: RDD[T]): RDD[T] =
-    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size)))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: RDD[T], numPartitions: Int): RDD[T] =
-    subtract(other, new HashPartitioner(numPartitions))
-
-  /**
-   * Return an RDD with the elements from `this` that are not in `other`.
-   */
-  def subtract(other: RDD[T], p: Partitioner): RDD[T] = {
-    if (partitioner == Some(p)) {
-      // Our partitioner knows how to handle T (which, since we have a partitioner, is
-      // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
-      val p2 = new Partitioner() {
-        override def numPartitions = p.numPartitions
-        override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1)
-      }
-      // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
-      // anyway, and when calling .keys, will not have a partitioner set, even though
-      // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
-      // partitioned by the right/real keys (e.g. p).
-      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
-    } else {
-      this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
-    }
-  }
-
-  /**
-   * Reduces the elements of this RDD using the specified commutative and associative binary operator.
-   */
-  def reduce(f: (T, T) => T): T = {
-    val cleanF = sc.clean(f)
-    val reducePartition: Iterator[T] => Option[T] = iter => {
-      if (iter.hasNext) {
-        Some(iter.reduceLeft(cleanF))
-      } else {
-        None
-      }
-    }
-    var jobResult: Option[T] = None
-    val mergeResult = (index: Int, taskResult: Option[T]) => {
-      if (taskResult != None) {
-        jobResult = jobResult match {
-          case Some(value) => Some(f(value, taskResult.get))
-          case None => taskResult
-        }
-      }
-    }
-    sc.runJob(this, reducePartition, mergeResult)
-    // Get the final result out of our Option, or throw an exception if the RDD was empty
-    jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
-  }
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using a
-   * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
-   * modify t1 and return it as its result value to avoid object allocation; however, it should not
-   * modify t2.
-   */
-  def fold(zeroValue: T)(op: (T, T) => T): T = {
-    // Clone the zero value since we will also be serializing it as part of tasks
-    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
-    val cleanOp = sc.clean(op)
-    val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
-    val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
-    sc.runJob(this, foldPartition, mergeResult)
-    jobResult
-  }
-
-  /**
-   * Aggregate the elements of each partition, and then the results for all the partitions, using
-   * given combine functions and a neutral "zero value". This function can return a different result
-   * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
-   * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
-   * allowed to modify and return their first argument instead of creating a new U to avoid memory
-   * allocation.
-   */
-  def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
-    // Clone the zero value since we will also be serializing it as part of tasks
-    var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
-    val cleanSeqOp = sc.clean(seqOp)
-    val cleanCombOp = sc.clean(combOp)
-    val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
-    val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
-    sc.runJob(this, aggregatePartition, mergeResult)
-    jobResult
-  }
-
-  /**
-   * Return the number of elements in the RDD.
-   */
-  def count(): Long = {
-    sc.runJob(this, (iter: Iterator[T]) => {
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }).sum
-  }
-
-  /**
-   * (Experimental) Approximate version of count() that returns a potentially incomplete result
-   * within a timeout, even if not all tasks have finished.
-   */
-  def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
-    val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
-      var result = 0L
-      while (iter.hasNext) {
-        result += 1L
-        iter.next()
-      }
-      result
-    }
-    val evaluator = new CountEvaluator(partitions.size, confidence)
-    sc.runApproximateJob(this, countElements, evaluator, timeout)
-  }
-
-  /**
-   * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
-   * combine step happens locally on the master, equivalent to running a single reduce task.
-   */
-  def countByValue(): Map[T, Long] = {
-    if (elementClassManifest.erasure.isArray) {
-      throw new SparkException("countByValue() does not support arrays")
-    }
-    // TODO: This should perhaps be distributed by default.
-    def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
-      }
-      Iterator(map)
-    }
-    def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
-      val iter = m2.object2LongEntrySet.fastIterator()
-      while (iter.hasNext) {
-        val entry = iter.next()
-        m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
-      }
-      return m1
-    }
-    val myResult = mapPartitions(countPartition).reduce(mergeMaps)
-    myResult.asInstanceOf[java.util.Map[T, Long]]   // Will be wrapped as a Scala mutable Map
-  }
-
-  /**
-   * (Experimental) Approximate version of countByValue().
-   */
-  def countByValueApprox(
-      timeout: Long,
-      confidence: Double = 0.95
-      ): PartialResult[Map[T, BoundedDouble]] = {
-    if (elementClassManifest.erasure.isArray) {
-      throw new SparkException("countByValueApprox() does not support arrays")
-    }
-    val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
-      val map = new OLMap[T]
-      while (iter.hasNext) {
-        val v = iter.next()
-        map.put(v, map.getLong(v) + 1L)
-      }
-      map
-    }
-    val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence)
-    sc.runApproximateJob(this, countPartition, evaluator, timeout)
-  }
-
-  /**
-   * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
-   * it will be slow if a lot of partitions are required. In that case, use collect() to get the
-   * whole RDD instead.
-   */
-  def take(num: Int): Array[T] = {
-    if (num == 0) {
-      return new Array[T](0)
-    }
-    val buf = new ArrayBuffer[T]
-    var p = 0
-    while (buf.size < num && p < partitions.size) {
-      val left = num - buf.size
-      val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, Array(p), true)
-      buf ++= res(0)
-      if (buf.size == num)
-        return buf.toArray
-      p += 1
-    }
-    return buf.toArray
-  }
-
-  /**
-   * Return the first element in this RDD.
-   */
-  def first(): T = take(1) match {
-    case Array(t) => t
-    case _ => throw new UnsupportedOperationException("empty collection")
-  }
-
-  /**
-   * Returns the top K elements from this RDD as defined by
-   * the specified implicit Ordering[T].
-   * @param num the number of top elements to return
-   * @param ord the implicit ordering for T
-   * @return an array of top elements
-   */
-  def top(num: Int)(implicit ord: Ordering[T]): Array[T] = {
-    mapPartitions { items =>
-      val queue = new BoundedPriorityQueue[T](num)
-      queue ++= items
-      Iterator.single(queue)
-    }.reduce { (queue1, queue2) =>
-      queue1 ++= queue2
-      queue1
-    }.toArray.sorted(ord.reverse)
-  }
-
-  /**
-   * Returns the first K elements from this RDD as defined by
-   * the specified implicit Ordering[T] and maintains the
-   * ordering.
-   * @param num the number of top elements to return
-   * @param ord the implicit ordering for T
-   * @return an array of top elements
-   */
-  def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse)
-
-  /**
-   * Save this RDD as a text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
-  }
-
-  /**
-   * Save this RDD as a compressed text file, using string representations of elements.
-   */
-  def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) {
-    this.map(x => (NullWritable.get(), new Text(x.toString)))
-      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec)
-  }
-
-  /**
-   * Save this RDD as a SequenceFile of serialized objects.
-   */
-  def saveAsObjectFile(path: String) {
-    this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
-      .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
-      .saveAsSequenceFile(path)
-  }
-
-  /**
-   * Creates tuples of the elements in this RDD by applying `f`.
-   */
-  def keyBy[K](f: T => K): RDD[(K, T)] = {
-    map(x => (f(x), x))
-  }
-
-  /** A private method for tests, to look at the contents of each partition */
-  private[spark] def collectPartitions(): Array[Array[T]] = {
-    sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
-  }
-
-  /**
-   * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint
-   * directory set with SparkContext.setCheckpointDir() and all references to its parent
-   * RDDs will be removed. This function must be called before any job has been
-   * executed on this RDD. It is strongly recommended that this RDD is persisted in
-   * memory, otherwise saving it on a file will require recomputation.
-   */
-  def checkpoint() {
-    if (context.checkpointDir.isEmpty) {
-      throw new Exception("Checkpoint directory has not been set in the SparkContext")
-    } else if (checkpointData.isEmpty) {
-      checkpointData = Some(new RDDCheckpointData(this))
-      checkpointData.get.markForCheckpoint()
-    }
-  }
-
-  /**
-   * Return whether this RDD has been checkpointed or not
-   */
-  def isCheckpointed: Boolean = {
-    checkpointData.map(_.isCheckpointed).getOrElse(false)
-  }
-
-  /**
-   * Gets the name of the file to which this RDD was checkpointed
-   */
-  def getCheckpointFile: Option[String] = {
-    checkpointData.flatMap(_.getCheckpointFile)
-  }
-
-  // =======================================================================
-  // Other internal methods and fields
-  // =======================================================================
-
-  private var storageLevel: StorageLevel = StorageLevel.NONE
-
-  /** Record user function generating this RDD. */
-  private[spark] val origin = Utils.formatSparkCallSite
-
-  private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
-
-  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
-
-  /** Returns the first parent RDD */
-  protected[spark] def firstParent[U: ClassManifest] = {
-    dependencies.head.rdd.asInstanceOf[RDD[U]]
-  }
-
-  /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */
-  def context = sc
-
-  // Avoid handling doCheckpoint multiple times to prevent excessive recursion
-  private var doCheckpointCalled = false
-
-  /**
-   * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler
-   * after a job using this RDD has completed (therefore the RDD has been materialized and
-   * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs.
-   */
-  private[spark] def doCheckpoint() {
-    if (!doCheckpointCalled) {
-      doCheckpointCalled = true
-      if (checkpointData.isDefined) {
-        checkpointData.get.doCheckpoint()
-      } else {
-        dependencies.foreach(_.rdd.doCheckpoint())
-      }
-    }
-  }
-
-  /**
-   * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
-   * created from the checkpoint file, and forget its old dependencies and partitions.
-   */
-  private[spark] def markCheckpointed(checkpointRDD: RDD[_]) {
-    clearDependencies()
-    partitions_ = null
-    deps = null    // Forget the constructor argument for dependencies too
-  }
-
-  /**
-   * Clears the dependencies of this RDD. This method must ensure that all references
-   * to the original parent RDDs is removed to enable the parent RDDs to be garbage
-   * collected. Subclasses of RDD may override this method for implementing their own cleaning
-   * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example.
-   */
-  protected def clearDependencies() {
-    dependencies_ = null
-  }
-
-  /** A description of this RDD and its recursive dependencies for debugging. */
-  def toDebugString: String = {
-    def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
-      Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
-        rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + "  "))
-    }
-    debugString(this).mkString("\n")
-  }
-
-  override def toString: String = "%s%s[%d] at %s".format(
-    Option(name).map(_ + " ").getOrElse(""),
-    getClass.getSimpleName,
-    id,
-    origin)
-
-  def toJavaRDD() : JavaRDD[T] = {
-    new JavaRDD(this)(elementClassManifest)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
deleted file mode 100644
index 0334de6..0000000
--- a/core/src/main/scala/org/apache/spark/RDDCheckpointData.scala
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.conf.Configuration
-import rdd.{CheckpointRDD, CoalescedRDD}
-import scheduler.{ResultTask, ShuffleMapTask}
-
-/**
- * Enumeration to manage state transitions of an RDD through checkpointing
- * [ Initialized --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
- */
-private[spark] object CheckpointState extends Enumeration {
-  type CheckpointState = Value
-  val Initialized, MarkedForCheckpoint, CheckpointingInProgress, Checkpointed = Value
-}
-
-/**
- * This class contains all the information related to RDD checkpointing. Each instance of this class
- * is associated with a RDD. It manages process of checkpointing of the associated RDD, as well as,
- * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
- * of the checkpointed RDD.
- */
-private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
-  extends Logging with Serializable {
-
-  import CheckpointState._
-
-  // The checkpoint state of the associated RDD.
-  var cpState = Initialized
-
-  // The file to which the associated RDD has been checkpointed to
-  @transient var cpFile: Option[String] = None
-
-  // The CheckpointRDD created from the checkpoint file, that is, the new parent the associated RDD.
-  var cpRDD: Option[RDD[T]] = None
-
-  // Mark the RDD for checkpointing
-  def markForCheckpoint() {
-    RDDCheckpointData.synchronized {
-      if (cpState == Initialized) cpState = MarkedForCheckpoint
-    }
-  }
-
-  // Is the RDD already checkpointed
-  def isCheckpointed: Boolean = {
-    RDDCheckpointData.synchronized { cpState == Checkpointed }
-  }
-
-  // Get the file to which this RDD was checkpointed to as an Option
-  def getCheckpointFile: Option[String] = {
-    RDDCheckpointData.synchronized { cpFile }
-  }
-
-  // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
-  def doCheckpoint() {
-    // If it is marked for checkpointing AND checkpointing is not already in progress,
-    // then set it to be in progress, else return
-    RDDCheckpointData.synchronized {
-      if (cpState == MarkedForCheckpoint) {
-        cpState = CheckpointingInProgress
-      } else {
-        return
-      }
-    }
-
-    // Create the output path for the checkpoint
-    val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
-    val fs = path.getFileSystem(new Configuration())
-    if (!fs.mkdirs(path)) {
-      throw new SparkException("Failed to create checkpoint path " + path)
-    }
-
-    // Save to file, and reload it as an RDD
-    rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
-    val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
-
-    // Change the dependencies and partitions of the RDD
-    RDDCheckpointData.synchronized {
-      cpFile = Some(path.toString)
-      cpRDD = Some(newRDD)
-      rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
-      cpState = Checkpointed
-      RDDCheckpointData.clearTaskCaches()
-      logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
-    }
-  }
-
-  // Get preferred location of a split after checkpointing
-  def getPreferredLocations(split: Partition): Seq[String] = {
-    RDDCheckpointData.synchronized {
-      cpRDD.get.preferredLocations(split)
-    }
-  }
-
-  def getPartitions: Array[Partition] = {
-    RDDCheckpointData.synchronized {
-      cpRDD.get.partitions
-    }
-  }
-
-  def checkpointRDD: Option[RDD[T]] = {
-    RDDCheckpointData.synchronized {
-      cpRDD
-    }
-  }
-}
-
-private[spark] object RDDCheckpointData {
-  def clearTaskCaches() {
-    ShuffleMapTask.clearCache()
-    ResultTask.clearCache()
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
deleted file mode 100644
index d58fb4e..0000000
--- a/core/src/main/scala/org/apache/spark/SequenceFileRDDFunctions.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io.EOFException
-import java.net.URL
-import java.io.ObjectInputStream
-import java.util.concurrent.atomic.AtomicLong
-import java.util.HashSet
-import java.util.Random
-import java.util.Date
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.Map
-import scala.collection.mutable.HashMap
-
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.OutputFormat
-import org.apache.hadoop.mapred.TextOutputFormat
-import org.apache.hadoop.mapred.SequenceFileOutputFormat
-import org.apache.hadoop.mapred.OutputCommitter
-import org.apache.hadoop.mapred.FileOutputCommitter
-import org.apache.hadoop.io.compress.CompressionCodec
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.Text
-
-import org.apache.spark.SparkContext._
-
-/**
- * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
- * through an implicit conversion. Note that this can't be part of PairRDDFunctions because
- * we need more implicit parameters to convert our keys and values to Writable.
- *
- * Users should import `spark.SparkContext._` at the top of their program to use these functions.
- */
-class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
-    self: RDD[(K, V)])
-  extends Logging
-  with Serializable {
-
-  private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
-    val c = {
-      if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
-        classManifest[T].erasure
-      } else {
-        // We get the type of the Writable class by looking at the apply method which converts
-        // from T to Writable. Since we have two apply methods we filter out the one which
-        // is not of the form "java.lang.Object apply(java.lang.Object)"
-        implicitly[T => Writable].getClass.getDeclaredMethods().filter(
-            m => m.getReturnType().toString != "class java.lang.Object" &&
-                 m.getName() == "apply")(0).getReturnType
-
-      }
-       // TODO: use something like WritableConverter to avoid reflection
-    }
-    c.asInstanceOf[Class[_ <: Writable]]
-  }
-
-  /**
-   * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
-   * and value types. If the key or value are Writable, then we use their classes directly;
-   * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
-   * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
-   * file system.
-   */
-  def saveAsSequenceFile(path: String, codec: Option[Class[_ <: CompressionCodec]] = None) {
-    def anyToWritable[U <% Writable](u: U): Writable = u
-
-    val keyClass = getWritableClass[K]
-    val valueClass = getWritableClass[V]
-    val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass)
-    val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass)
-
-    logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" )
-    val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
-    val jobConf = new JobConf(self.context.hadoopConfiguration)
-    if (!convertKey && !convertValue) {
-      self.saveAsHadoopFile(path, keyClass, valueClass, format, jobConf, codec)
-    } else if (!convertKey && convertValue) {
-      self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    } else if (convertKey && !convertValue) {
-      self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    } else if (convertKey && convertValue) {
-      self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(
-        path, keyClass, valueClass, format, jobConf, codec)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/SizeEstimator.scala
deleted file mode 100644
index 4bfc837..0000000
--- a/core/src/main/scala/org/apache/spark/SizeEstimator.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.lang.reflect.{Array => JArray}
-import java.util.IdentityHashMap
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Random
-
-import javax.management.MBeanServer
-import java.lang.management.ManagementFactory
-
-import scala.collection.mutable.ArrayBuffer
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
-/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in 
- * memory-aware caches.
- *
- * Based on the following JavaWorld article:
- * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
- */
-private[spark] object SizeEstimator extends Logging {
-
-  // Sizes of primitive types
-  private val BYTE_SIZE    = 1
-  private val BOOLEAN_SIZE = 1
-  private val CHAR_SIZE    = 2
-  private val SHORT_SIZE   = 2
-  private val INT_SIZE     = 4
-  private val LONG_SIZE    = 8
-  private val FLOAT_SIZE   = 4
-  private val DOUBLE_SIZE  = 8
-
-  // Alignment boundary for objects
-  // TODO: Is this arch dependent ?
-  private val ALIGN_SIZE = 8
-
-  // A cache of ClassInfo objects for each class
-  private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
-
-  // Object and pointer sizes are arch dependent
-  private var is64bit = false
-
-  // Size of an object reference
-  // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
-  private var isCompressedOops = false
-  private var pointerSize = 4
-
-  // Minimum size of a java.lang.Object
-  private var objectSize = 8
-
-  initialize()
-
-  // Sets object size, pointer size based on architecture and CompressedOops settings
-  // from the JVM.
-  private def initialize() {
-    is64bit = System.getProperty("os.arch").contains("64")
-    isCompressedOops = getIsCompressedOops
-
-    objectSize = if (!is64bit) 8 else {
-      if(!isCompressedOops) {
-        16
-      } else {
-        12
-      }
-    }
-    pointerSize = if (is64bit && !isCompressedOops) 8 else 4
-    classInfos.clear()
-    classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
-  }
-
-  private def getIsCompressedOops : Boolean = {
-    if (System.getProperty("spark.test.useCompressedOops") != null) {
-      return System.getProperty("spark.test.useCompressedOops").toBoolean 
-    }
-
-    try {
-      val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
-      val server = ManagementFactory.getPlatformMBeanServer()
-
-      // NOTE: This should throw an exception in non-Sun JVMs
-      val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
-      val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
-          Class.forName("java.lang.String"))
-
-      val bean = ManagementFactory.newPlatformMXBeanProxy(server, 
-        hotSpotMBeanName, hotSpotMBeanClass)
-      // TODO: We could use reflection on the VMOption returned ?
-      return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
-    } catch {
-      case e: Exception => {
-        // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
-        val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
-        val guessInWords = if (guess) "yes" else "not"
-        logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
-        return guess
-      }
-    }
-  }
-
-  /**
-   * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
-   * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects
-   * to visit.
-   */
-  private class SearchState(val visited: IdentityHashMap[AnyRef, AnyRef]) {
-    val stack = new ArrayBuffer[AnyRef]
-    var size = 0L
-
-    def enqueue(obj: AnyRef) {
-      if (obj != null && !visited.containsKey(obj)) {
-        visited.put(obj, null)
-        stack += obj
-      }
-    }
-
-    def isFinished(): Boolean = stack.isEmpty
-
-    def dequeue(): AnyRef = {
-      val elem = stack.last
-      stack.trimEnd(1)
-      return elem
-    }
-  }
-
-  /**
-   * Cached information about each class. We remember two things: the "shell size" of the class
-   * (size of all non-static fields plus the java.lang.Object size), and any fields that are
-   * pointers to objects.
-   */
-  private class ClassInfo(
-    val shellSize: Long,
-    val pointerFields: List[Field]) {}
-
-  def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
-
-  private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
-    val state = new SearchState(visited)
-    state.enqueue(obj)
-    while (!state.isFinished) {
-      visitSingleObject(state.dequeue(), state)
-    }
-    return state.size
-  }
-
-  private def visitSingleObject(obj: AnyRef, state: SearchState) {
-    val cls = obj.getClass
-    if (cls.isArray) {
-      visitArray(obj, cls, state)
-    } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
-      // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
-      // the size estimator since it references the whole REPL. Do nothing in this case. In
-      // general all ClassLoaders and Classes will be shared between objects anyway.
-    } else {
-      val classInfo = getClassInfo(cls)
-      state.size += classInfo.shellSize
-      for (field <- classInfo.pointerFields) {
-        state.enqueue(field.get(obj))
-      }
-    }
-  }
-
-  // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
-  private val ARRAY_SIZE_FOR_SAMPLING = 200
-  private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
-
-  private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
-    val length = JArray.getLength(array)
-    val elementClass = cls.getComponentType
-
-    // Arrays have object header and length field which is an integer
-    var arrSize: Long = alignSize(objectSize + INT_SIZE)
-
-    if (elementClass.isPrimitive) {
-      arrSize += alignSize(length * primitiveSize(elementClass))
-      state.size += arrSize
-    } else {
-      arrSize += alignSize(length * pointerSize)
-      state.size += arrSize
-
-      if (length <= ARRAY_SIZE_FOR_SAMPLING) {
-        for (i <- 0 until length) {
-          state.enqueue(JArray.get(array, i))
-        }
-      } else {
-        // Estimate the size of a large array by sampling elements without replacement.
-        var size = 0.0
-        val rand = new Random(42)
-        val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
-        for (i <- 0 until ARRAY_SAMPLE_SIZE) {
-          var index = 0
-          do {
-            index = rand.nextInt(length)
-          } while (drawn.contains(index))
-          drawn.add(index)
-          val elem = JArray.get(array, index)
-          size += SizeEstimator.estimate(elem, state.visited)
-        }
-        state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
-      }
-    }
-  }
-
-  private def primitiveSize(cls: Class[_]): Long = {
-    if (cls == classOf[Byte])
-      BYTE_SIZE
-    else if (cls == classOf[Boolean])
-      BOOLEAN_SIZE
-    else if (cls == classOf[Char])
-      CHAR_SIZE
-    else if (cls == classOf[Short])
-      SHORT_SIZE
-    else if (cls == classOf[Int])
-      INT_SIZE
-    else if (cls == classOf[Long])
-      LONG_SIZE
-    else if (cls == classOf[Float])
-      FLOAT_SIZE
-    else if (cls == classOf[Double])
-      DOUBLE_SIZE
-    else throw new IllegalArgumentException(
-      "Non-primitive class " + cls + " passed to primitiveSize()")
-  }
-
-  /**
-   * Get or compute the ClassInfo for a given class.
-   */
-  private def getClassInfo(cls: Class[_]): ClassInfo = {
-    // Check whether we've already cached a ClassInfo for this class
-    val info = classInfos.get(cls)
-    if (info != null) {
-      return info
-    }
-    
-    val parent = getClassInfo(cls.getSuperclass)
-    var shellSize = parent.shellSize
-    var pointerFields = parent.pointerFields
-
-    for (field <- cls.getDeclaredFields) {
-      if (!Modifier.isStatic(field.getModifiers)) {
-        val fieldClass = field.getType
-        if (fieldClass.isPrimitive) {
-          shellSize += primitiveSize(fieldClass)
-        } else {
-          field.setAccessible(true) // Enable future get()'s on this field
-          shellSize += pointerSize
-          pointerFields = field :: pointerFields
-        }
-      }
-    }
-
-    shellSize = alignSize(shellSize)
-
-    // Create and cache a new ClassInfo
-    val newInfo = new ClassInfo(shellSize, pointerFields)
-    classInfos.put(cls, newInfo)
-    return newInfo
-  }
-
-  private def alignSize(size: Long): Long = {
-    val rem = size % ALIGN_SIZE
-    return if (rem == 0) size else (size + ALIGN_SIZE - rem)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 1207b24..faf0c23 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -54,17 +54,15 @@ import org.apache.mesos.MesosNativeLibrary
 
 import org.apache.spark.deploy.LocalSparkCluster
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
-import org.apache.spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
-  OrderedRDDFunctions}
+import org.apache.spark.rdd._
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
   ClusterScheduler, Schedulable, SchedulingMode}
 import org.apache.spark.scheduler.local.LocalScheduler
 import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import org.apache.spark.storage.{StorageUtils, BlockManagerSource}
 import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{MetadataCleaner, TimeStampedHashMap}
-import scala.Some
+import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap}
 import org.apache.spark.scheduler.StageInfo
 import org.apache.spark.storage.RDDInfo
 import org.apache.spark.storage.StorageStatus

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 6e6fe5d..478e5a0 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -29,7 +29,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
 import org.apache.spark.network.ConnectionManager
 import org.apache.spark.serializer.{Serializer, SerializerManager}
-import org.apache.spark.util.AkkaUtils
+import org.apache.spark.util.{Utils, AkkaUtils}
 import org.apache.spark.api.python.PythonWorkerFactory
 
 
@@ -155,10 +155,10 @@ object SparkEnv extends Logging {
     val serializerManager = new SerializerManager
 
     val serializer = serializerManager.setDefault(
-      System.getProperty("spark.serializer", "org.apache.spark.JavaSerializer"))
+      System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
 
     val closureSerializer = serializerManager.get(
-      System.getProperty("spark.closure.serializer", "org.apache.spark.JavaSerializer"))
+      System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
 
     def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
       if (isDriver) {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a8cc309/core/src/main/scala/org/apache/spark/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Utils.scala b/core/src/main/scala/org/apache/spark/Utils.scala
deleted file mode 100644
index 1e17deb..0000000
--- a/core/src/main/scala/org/apache/spark/Utils.scala
+++ /dev/null
@@ -1,780 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-import java.io._
-import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address, ServerSocket}
-import java.util.{Locale, Random, UUID}
-import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadFactory, ThreadPoolExecutor}
-import java.util.regex.Pattern
-
-import scala.collection.Map
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-import scala.collection.JavaConversions._
-import scala.io.Source
-
-import com.google.common.io.Files
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-
-import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
-
-import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
-import org.apache.spark.deploy.SparkHadoopUtil
-import java.nio.ByteBuffer
-
-
-/**
- * Various utility methods used by Spark.
- */
-private object Utils extends Logging {
-
-  /** Serialize an object using Java serialization */
-  def serialize[T](o: T): Array[Byte] = {
-    val bos = new ByteArrayOutputStream()
-    val oos = new ObjectOutputStream(bos)
-    oos.writeObject(o)
-    oos.close()
-    return bos.toByteArray
-  }
-
-  /** Deserialize an object using Java serialization */
-  def deserialize[T](bytes: Array[Byte]): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis)
-    return ois.readObject.asInstanceOf[T]
-  }
-
-  /** Deserialize an object using Java serialization and the given ClassLoader */
-  def deserialize[T](bytes: Array[Byte], loader: ClassLoader): T = {
-    val bis = new ByteArrayInputStream(bytes)
-    val ois = new ObjectInputStream(bis) {
-      override def resolveClass(desc: ObjectStreamClass) =
-        Class.forName(desc.getName, false, loader)
-    }
-    return ois.readObject.asInstanceOf[T]
-  }
-
-  /** Serialize via nested stream using specific serializer */
-  def serializeViaNestedStream(os: OutputStream, ser: SerializerInstance)(f: SerializationStream => Unit) = {
-    val osWrapper = ser.serializeStream(new OutputStream {
-      def write(b: Int) = os.write(b)
-
-      override def write(b: Array[Byte], off: Int, len: Int) = os.write(b, off, len)
-    })
-    try {
-      f(osWrapper)
-    } finally {
-      osWrapper.close()
-    }
-  }
-
-  /** Deserialize via nested stream using specific serializer */
-  def deserializeViaNestedStream(is: InputStream, ser: SerializerInstance)(f: DeserializationStream => Unit) = {
-    val isWrapper = ser.deserializeStream(new InputStream {
-      def read(): Int = is.read()
-
-      override def read(b: Array[Byte], off: Int, len: Int): Int = is.read(b, off, len)
-    })
-    try {
-      f(isWrapper)
-    } finally {
-      isWrapper.close()
-    }
-  }
-
-  /**
-   * Primitive often used when writing {@link java.nio.ByteBuffer} to {@link java.io.DataOutput}.
-   */
-  def writeByteBuffer(bb: ByteBuffer, out: ObjectOutput) = {
-    if (bb.hasArray) {
-      out.write(bb.array(), bb.arrayOffset() + bb.position(), bb.remaining())
-    } else {
-      val bbval = new Array[Byte](bb.remaining())
-      bb.get(bbval)
-      out.write(bbval)
-    }
-  }
-
-  def isAlpha(c: Char): Boolean = {
-    (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
-  }
-
-  /** Split a string into words at non-alphabetic characters */
-  def splitWords(s: String): Seq[String] = {
-    val buf = new ArrayBuffer[String]
-    var i = 0
-    while (i < s.length) {
-      var j = i
-      while (j < s.length && isAlpha(s.charAt(j))) {
-        j += 1
-      }
-      if (j > i) {
-        buf += s.substring(i, j)
-      }
-      i = j
-      while (i < s.length && !isAlpha(s.charAt(i))) {
-        i += 1
-      }
-    }
-    return buf
-  }
-
-  private val shutdownDeletePaths = new collection.mutable.HashSet[String]()
-
-  // Register the path to be deleted via shutdown hook
-  def registerShutdownDeleteDir(file: File) {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths += absolutePath
-    }
-  }
-
-  // Is the path already registered to be deleted via a shutdown hook ?
-  def hasShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.contains(absolutePath)
-    }
-  }
-
-  // Note: if file is child of some registered path, while not equal to it, then return true;
-  // else false. This is to ensure that two shutdown hooks do not try to delete each others
-  // paths - resulting in IOException and incomplete cleanup.
-  def hasRootAsShutdownDeleteDir(file: File): Boolean = {
-    val absolutePath = file.getAbsolutePath()
-    val retval = shutdownDeletePaths.synchronized {
-      shutdownDeletePaths.find { path =>
-        !absolutePath.equals(path) && absolutePath.startsWith(path)
-      }.isDefined
-    }
-    if (retval) {
-      logInfo("path = " + file + ", already present as root for deletion.")
-    }
-    retval
-  }
-
-  /** Create a temporary directory inside the given parent directory */
-  def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
-    var attempts = 0
-    val maxAttempts = 10
-    var dir: File = null
-    while (dir == null) {
-      attempts += 1
-      if (attempts > maxAttempts) {
-        throw new IOException("Failed to create a temp directory (under " + root + ") after " +
-          maxAttempts + " attempts!")
-      }
-      try {
-        dir = new File(root, "spark-" + UUID.randomUUID.toString)
-        if (dir.exists() || !dir.mkdirs()) {
-          dir = null
-        }
-      } catch { case e: IOException => ; }
-    }
-
-    registerShutdownDeleteDir(dir)
-
-    // Add a shutdown hook to delete the temp dir when the JVM exits
-    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dir " + dir) {
-      override def run() {
-        // Attempt to delete if some patch which is parent of this is not already registered.
-        if (! hasRootAsShutdownDeleteDir(dir)) Utils.deleteRecursively(dir)
-      }
-    })
-    dir
-  }
-
-  /** Copy all data from an InputStream to an OutputStream */
-  def copyStream(in: InputStream,
-                 out: OutputStream,
-                 closeStreams: Boolean = false)
-  {
-    val buf = new Array[Byte](8192)
-    var n = 0
-    while (n != -1) {
-      n = in.read(buf)
-      if (n != -1) {
-        out.write(buf, 0, n)
-      }
-    }
-    if (closeStreams) {
-      in.close()
-      out.close()
-    }
-  }
-
-  /**
-   * Download a file requested by the executor. Supports fetching the file in a variety of ways,
-   * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
-   *
-   * Throws SparkException if the target file already exists and has different contents than
-   * the requested file.
-   */
-  def fetchFile(url: String, targetDir: File) {
-    val filename = url.split("/").last
-    val tempDir = getLocalDir
-    val tempFile =  File.createTempFile("fetchFileTemp", null, new File(tempDir))
-    val targetFile = new File(targetDir, filename)
-    val uri = new URI(url)
-    uri.getScheme match {
-      case "http" | "https" | "ftp" =>
-        logInfo("Fetching " + url + " to " + tempFile)
-        val in = new URL(url).openStream()
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException(
-            "File " + targetFile + " exists and does not match contents of" + " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
-        }
-      case "file" | null =>
-        // In the case of a local file, copy the local file to the target directory.
-        // Note the difference between uri vs url.
-        val sourceFile = if (uri.isAbsolute) new File(uri) else new File(url)
-        if (targetFile.exists) {
-          // If the target file already exists, warn the user if
-          if (!Files.equal(sourceFile, targetFile)) {
-            throw new SparkException(
-              "File " + targetFile + " exists and does not match contents of" + " " + url)
-          } else {
-            // Do nothing if the file contents are the same, i.e. this file has been copied
-            // previously.
-            logInfo(sourceFile.getAbsolutePath + " has been previously copied to "
-              + targetFile.getAbsolutePath)
-          }
-        } else {
-          // The file does not exist in the target directory. Copy it there.
-          logInfo("Copying " + sourceFile.getAbsolutePath + " to " + targetFile.getAbsolutePath)
-          Files.copy(sourceFile, targetFile)
-        }
-      case _ =>
-        // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
-        val env = SparkEnv.get
-        val uri = new URI(url)
-        val conf = env.hadoop.newConfiguration()
-        val fs = FileSystem.get(uri, conf)
-        val in = fs.open(new Path(uri))
-        val out = new FileOutputStream(tempFile)
-        Utils.copyStream(in, out, true)
-        if (targetFile.exists && !Files.equal(tempFile, targetFile)) {
-          tempFile.delete()
-          throw new SparkException("File " + targetFile + " exists and does not match contents of" +
-            " " + url)
-        } else {
-          Files.move(tempFile, targetFile)
-        }
-    }
-    // Decompress the file if it's a .tar or .tar.gz
-    if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xzf", filename), targetDir)
-    } else if (filename.endsWith(".tar")) {
-      logInfo("Untarring " + filename)
-      Utils.execute(Seq("tar", "-xf", filename), targetDir)
-    }
-    // Make the file executable - That's necessary for scripts
-    FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
-  }
-
-  /**
-   * Get a temporary directory using Spark's spark.local.dir property, if set. This will always
-   * return a single directory, even though the spark.local.dir property might be a list of
-   * multiple paths.
-   */
-  def getLocalDir: String = {
-    System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
-  }
-
-  /**
-   * Shuffle the elements of a collection into a random order, returning the
-   * result in a new collection. Unlike scala.util.Random.shuffle, this method
-   * uses a local random number generator, avoiding inter-thread contention.
-   */
-  def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
-    randomizeInPlace(seq.toArray)
-  }
-
-  /**
-   * Shuffle the elements of an array into a random order, modifying the
-   * original array. Returns the original array.
-   */
-  def randomizeInPlace[T](arr: Array[T], rand: Random = new Random): Array[T] = {
-    for (i <- (arr.length - 1) to 1 by -1) {
-      val j = rand.nextInt(i)
-      val tmp = arr(j)
-      arr(j) = arr(i)
-      arr(i) = tmp
-    }
-    arr
-  }
-
-  /**
-   * Get the local host's IP address in dotted-quad format (e.g. 1.2.3.4).
-   * Note, this is typically not used from within core spark.
-   */
-  lazy val localIpAddress: String = findLocalIpAddress()
-  lazy val localIpAddressHostname: String = getAddressHostName(localIpAddress)
-
-  private def findLocalIpAddress(): String = {
-    val defaultIpOverride = System.getenv("SPARK_LOCAL_IP")
-    if (defaultIpOverride != null) {
-      defaultIpOverride
-    } else {
-      val address = InetAddress.getLocalHost
-      if (address.isLoopbackAddress) {
-        // Address resolves to something like 127.0.1.1, which happens on Debian; try to find
-        // a better address using the local network interfaces
-        for (ni <- NetworkInterface.getNetworkInterfaces) {
-          for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress &&
-               !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) {
-            // We've found an address that looks reasonable!
-            logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
-              " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress +
-              " instead (on interface " + ni.getName + ")")
-            logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
-            return addr.getHostAddress
-          }
-        }
-        logWarning("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" +
-          " a loopback address: " + address.getHostAddress + ", but we couldn't find any" +
-          " external IP address!")
-        logWarning("Set SPARK_LOCAL_IP if you need to bind to another address")
-      }
-      address.getHostAddress
-    }
-  }
-
-  private var customHostname: Option[String] = None
-
-  /**
-   * Allow setting a custom host name because when we run on Mesos we need to use the same
-   * hostname it reports to the master.
-   */
-  def setCustomHostname(hostname: String) {
-    // DEBUG code
-    Utils.checkHost(hostname)
-    customHostname = Some(hostname)
-  }
-
-  /**
-   * Get the local machine's hostname.
-   */
-  def localHostName(): String = {
-    customHostname.getOrElse(localIpAddressHostname)
-  }
-
-  def getAddressHostName(address: String): String = {
-    InetAddress.getByName(address).getHostName
-  }
-
-  def localHostPort(): String = {
-    val retval = System.getProperty("spark.hostPort", null)
-    if (retval == null) {
-      logErrorWithStack("spark.hostPort not set but invoking localHostPort")
-      return localHostName()
-    }
-
-    retval
-  }
-
-  def checkHost(host: String, message: String = "") {
-    assert(host.indexOf(':') == -1, message)
-  }
-
-  def checkHostPort(hostPort: String, message: String = "") {
-    assert(hostPort.indexOf(':') != -1, message)
-  }
-
-  // Used by DEBUG code : remove when all testing done
-  def logErrorWithStack(msg: String) {
-    try { throw new Exception } catch { case ex: Exception => { logError(msg, ex) } }
-  }
-
-  // Typically, this will be of order of number of nodes in cluster
-  // If not, we should change it to LRUCache or something.
-  private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]()
-
-  def parseHostPort(hostPort: String): (String,  Int) = {
-    {
-      // Check cache first.
-      var cached = hostPortParseResults.get(hostPort)
-      if (cached != null) return cached
-    }
-
-    val indx: Int = hostPort.lastIndexOf(':')
-    // This is potentially broken - when dealing with ipv6 addresses for example, sigh ...
-    // but then hadoop does not support ipv6 right now.
-    // For now, we assume that if port exists, then it is valid - not check if it is an int > 0
-    if (-1 == indx) {
-      val retval = (hostPort, 0)
-      hostPortParseResults.put(hostPort, retval)
-      return retval
-    }
-
-    val retval = (hostPort.substring(0, indx).trim(), hostPort.substring(indx + 1).trim().toInt)
-    hostPortParseResults.putIfAbsent(hostPort, retval)
-    hostPortParseResults.get(hostPort)
-  }
-
-  private[spark] val daemonThreadFactory: ThreadFactory =
-    new ThreadFactoryBuilder().setDaemon(true).build()
-
-  /**
-   * Wrapper over newCachedThreadPool.
-   */
-  def newDaemonCachedThreadPool(): ThreadPoolExecutor =
-    Executors.newCachedThreadPool(daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
-  /**
-   * Return the string to tell how long has passed in seconds. The passing parameter should be in
-   * millisecond.
-   */
-  def getUsedTimeMs(startTimeMs: Long): String = {
-    return " " + (System.currentTimeMillis - startTimeMs) + " ms"
-  }
-
-  /**
-   * Wrapper over newFixedThreadPool.
-   */
-  def newDaemonFixedThreadPool(nThreads: Int): ThreadPoolExecutor =
-    Executors.newFixedThreadPool(nThreads, daemonThreadFactory).asInstanceOf[ThreadPoolExecutor]
-
-  /**
-   * Delete a file or directory and its contents recursively.
-   */
-  def deleteRecursively(file: File) {
-    if (file.isDirectory) {
-      for (child <- file.listFiles()) {
-        deleteRecursively(child)
-      }
-    }
-    if (!file.delete()) {
-      throw new IOException("Failed to delete: " + file)
-    }
-  }
-
-  /**
-   * Convert a Java memory parameter passed to -Xmx (such as 300m or 1g) to a number of megabytes.
-   * This is used to figure out how much memory to claim from Mesos based on the SPARK_MEM
-   * environment variable.
-   */
-  def memoryStringToMb(str: String): Int = {
-    val lower = str.toLowerCase
-    if (lower.endsWith("k")) {
-      (lower.substring(0, lower.length-1).toLong / 1024).toInt
-    } else if (lower.endsWith("m")) {
-      lower.substring(0, lower.length-1).toInt
-    } else if (lower.endsWith("g")) {
-      lower.substring(0, lower.length-1).toInt * 1024
-    } else if (lower.endsWith("t")) {
-      lower.substring(0, lower.length-1).toInt * 1024 * 1024
-    } else {// no suffix, so it's just a number in bytes
-      (lower.toLong / 1024 / 1024).toInt
-    }
-  }
-
-  /**
-   * Convert a quantity in bytes to a human-readable string such as "4.0 MB".
-   */
-  def bytesToString(size: Long): String = {
-    val TB = 1L << 40
-    val GB = 1L << 30
-    val MB = 1L << 20
-    val KB = 1L << 10
-
-    val (value, unit) = {
-      if (size >= 2*TB) {
-        (size.asInstanceOf[Double] / TB, "TB")
-      } else if (size >= 2*GB) {
-        (size.asInstanceOf[Double] / GB, "GB")
-      } else if (size >= 2*MB) {
-        (size.asInstanceOf[Double] / MB, "MB")
-      } else if (size >= 2*KB) {
-        (size.asInstanceOf[Double] / KB, "KB")
-      } else {
-        (size.asInstanceOf[Double], "B")
-      }
-    }
-    "%.1f %s".formatLocal(Locale.US, value, unit)
-  }
-
-  /**
-   * Returns a human-readable string representing a duration such as "35ms"
-   */
-  def msDurationToString(ms: Long): String = {
-    val second = 1000
-    val minute = 60 * second
-    val hour = 60 * minute
-
-    ms match {
-      case t if t < second =>
-        "%d ms".format(t)
-      case t if t < minute =>
-        "%.1f s".format(t.toFloat / second)
-      case t if t < hour =>
-        "%.1f m".format(t.toFloat / minute)
-      case t =>
-        "%.2f h".format(t.toFloat / hour)
-    }
-  }
-
-  /**
-   * Convert a quantity in megabytes to a human-readable string such as "4.0 MB".
-   */
-  def megabytesToString(megabytes: Long): String = {
-    bytesToString(megabytes * 1024L * 1024L)
-  }
-
-  /**
-   * Execute a command in the given working directory, throwing an exception if it completes
-   * with an exit code other than 0.
-   */
-  def execute(command: Seq[String], workingDir: File) {
-    val process = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-        .redirectErrorStream(true)
-        .start()
-    new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val exitCode = process.waitFor()
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
-    }
-  }
-
-  /**
-   * Execute a command in the current working directory, throwing an exception if it completes
-   * with an exit code other than 0.
-   */
-  def execute(command: Seq[String]) {
-    execute(command, new File("."))
-  }
-
-  /**
-   * Execute a command and get its output, throwing an exception if it yields a code other than 0.
-   */
-  def executeAndGetOutput(command: Seq[String], workingDir: File = new File("."),
-                          extraEnvironment: Map[String, String] = Map.empty): String = {
-    val builder = new ProcessBuilder(command: _*)
-        .directory(workingDir)
-    val environment = builder.environment()
-    for ((key, value) <- extraEnvironment) {
-      environment.put(key, value)
-    }
-    val process = builder.start()
-    new Thread("read stderr for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getErrorStream).getLines) {
-          System.err.println(line)
-        }
-      }
-    }.start()
-    val output = new StringBuffer
-    val stdoutThread = new Thread("read stdout for " + command(0)) {
-      override def run() {
-        for (line <- Source.fromInputStream(process.getInputStream).getLines) {
-          output.append(line)
-        }
-      }
-    }
-    stdoutThread.start()
-    val exitCode = process.waitFor()
-    stdoutThread.join()   // Wait for it to finish reading output
-    if (exitCode != 0) {
-      throw new SparkException("Process " + command + " exited with code " + exitCode)
-    }
-    output.toString
-  }
-
-  /**
-   * A regular expression to match classes of the "core" Spark API that we want to skip when
-   * finding the call site of a method.
-   */
-  private val SPARK_CLASS_REGEX = """^spark(\.api\.java)?(\.rdd)?\.[A-Z]""".r
-
-  private[spark] class CallSiteInfo(val lastSparkMethod: String, val firstUserFile: String,
-                                    val firstUserLine: Int, val firstUserClass: String)
-
-  /**
-   * When called inside a class in the spark package, returns the name of the user code class
-   * (outside the spark package) that called into Spark, as well as which Spark method they called.
-   * This is used, for example, to tell users where in their code each RDD got created.
-   */
-  def getCallSiteInfo: CallSiteInfo = {
-    val trace = Thread.currentThread.getStackTrace().filter( el =>
-      (!el.getMethodName.contains("getStackTrace")))
-
-    // Keep crawling up the stack trace until we find the first function not inside of the spark
-    // package. We track the last (shallowest) contiguous Spark method. This might be an RDD
-    // transformation, a SparkContext function (such as parallelize), or anything else that leads
-    // to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
-    var lastSparkMethod = "<unknown>"
-    var firstUserFile = "<unknown>"
-    var firstUserLine = 0
-    var finished = false
-    var firstUserClass = "<unknown>"
-
-    for (el <- trace) {
-      if (!finished) {
-        if (SPARK_CLASS_REGEX.findFirstIn(el.getClassName) != None) {
-          lastSparkMethod = if (el.getMethodName == "<init>") {
-            // Spark method is a constructor; get its class name
-            el.getClassName.substring(el.getClassName.lastIndexOf('.') + 1)
-          } else {
-            el.getMethodName
-          }
-        }
-        else {
-          firstUserLine = el.getLineNumber
-          firstUserFile = el.getFileName
-          firstUserClass = el.getClassName
-          finished = true
-        }
-      }
-    }
-    new CallSiteInfo(lastSparkMethod, firstUserFile, firstUserLine, firstUserClass)
-  }
-
-  def formatSparkCallSite = {
-    val callSiteInfo = getCallSiteInfo
-    "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile,
-                         callSiteInfo.firstUserLine)
-  }
-
-  /** Return a string containing part of a file from byte 'start' to 'end'. */
-  def offsetBytes(path: String, start: Long, end: Long): String = {
-    val file = new File(path)
-    val length = file.length()
-    val effectiveEnd = math.min(length, end)
-    val effectiveStart = math.max(0, start)
-    val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
-    val stream = new FileInputStream(file)
-
-    stream.skip(effectiveStart)
-    stream.read(buff)
-    stream.close()
-    Source.fromBytes(buff).mkString
-  }
-
-  /**
-   * Clone an object using a Spark serializer.
-   */
-  def clone[T](value: T, serializer: SerializerInstance): T = {
-    serializer.deserialize[T](serializer.serialize(value))
-  }
-
-  /**
-   * Detect whether this thread might be executing a shutdown hook. Will always return true if
-   * the current thread is a running a shutdown hook but may spuriously return true otherwise (e.g.
-   * if System.exit was just called by a concurrent thread).
-   *
-   * Currently, this detects whether the JVM is shutting down by Runtime#addShutdownHook throwing
-   * an IllegalStateException.
-   */
-  def inShutdown(): Boolean = {
-    try {
-      val hook = new Thread {
-        override def run() {}
-      }
-      Runtime.getRuntime.addShutdownHook(hook)
-      Runtime.getRuntime.removeShutdownHook(hook)
-    } catch {
-      case ise: IllegalStateException => return true
-    }
-    return false
-  }
-
-  def isSpace(c: Char): Boolean = {
-    " \t\r\n".indexOf(c) != -1
-  }
-
-  /**
-   * Split a string of potentially quoted arguments from the command line the way that a shell
-   * would do it to determine arguments to a command. For example, if the string is 'a "b c" d',
-   * then it would be parsed as three arguments: 'a', 'b c' and 'd'.
-   */
-  def splitCommandString(s: String): Seq[String] = {
-    val buf = new ArrayBuffer[String]
-    var inWord = false
-    var inSingleQuote = false
-    var inDoubleQuote = false
-    var curWord = new StringBuilder
-    def endWord() {
-      buf += curWord.toString
-      curWord.clear()
-    }
-    var i = 0
-    while (i < s.length) {
-      var nextChar = s.charAt(i)
-      if (inDoubleQuote) {
-        if (nextChar == '"') {
-          inDoubleQuote = false
-        } else if (nextChar == '\\') {
-          if (i < s.length - 1) {
-            // Append the next character directly, because only " and \ may be escaped in
-            // double quotes after the shell's own expansion
-            curWord.append(s.charAt(i + 1))
-            i += 1
-          }
-        } else {
-          curWord.append(nextChar)
-        }
-      } else if (inSingleQuote) {
-        if (nextChar == '\'') {
-          inSingleQuote = false
-        } else {
-          curWord.append(nextChar)
-        }
-        // Backslashes are not treated specially in single quotes
-      } else if (nextChar == '"') {
-        inWord = true
-        inDoubleQuote = true
-      } else if (nextChar == '\'') {
-        inWord = true
-        inSingleQuote = true
-      } else if (!isSpace(nextChar)) {
-        curWord.append(nextChar)
-        inWord = true
-      } else if (inWord && isSpace(nextChar)) {
-        endWord()
-        inWord = false
-      }
-      i += 1
-    }
-    if (inWord || inDoubleQuote || inSingleQuote) {
-      endWord()
-    }
-    return buf
-  }
-
- /* Calculates 'x' modulo 'mod', takes to consideration sign of x,
-  * i.e. if 'x' is negative, than 'x' % 'mod' is negative too
-  * so function return (x % mod) + mod in that case.
-  */
-  def nonNegativeMod(x: Int, mod: Int): Int = {
-    val rawMod = x % mod
-    rawMod + (if (rawMod < 0) mod else 0)
-  }
-}