You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:11 UTC

[27/69] [abbrv] [partial] Initial work to rename package to org.apache.spark

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
deleted file mode 100644
index 6cc5756..0000000
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.lang.reflect.{Array => JArray}
-import java.util.IdentityHashMap
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Random
-
-import javax.management.MBeanServer
-import java.lang.management.ManagementFactory
-
-import scala.collection.mutable.ArrayBuffer
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
-/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in 
- * memory-aware caches.
- *
- * Based on the following JavaWorld article:
- * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
- */
-private[spark] object SizeEstimator extends Logging {
-
-  // Sizes of primitive types
-  private val BYTE_SIZE    = 1
-  private val BOOLEAN_SIZE = 1
-  private val CHAR_SIZE    = 2
-  private val SHORT_SIZE   = 2
-  private val INT_SIZE     = 4
-  private val LONG_SIZE    = 8
-  private val FLOAT_SIZE   = 4
-  private val DOUBLE_SIZE  = 8
-
-  // Alignment boundary for objects
-  // TODO: Is this arch dependent ?
-  private val ALIGN_SIZE = 8
-
-  // A cache of ClassInfo objects for each class
-  private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
-
-  // Object and pointer sizes are arch dependent
-  private var is64bit = false
-
-  // Size of an object reference
-  // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
-  private var isCompressedOops = false
-  private var pointerSize = 4
-
-  // Minimum size of a java.lang.Object
-  private var objectSize = 8
-
-  initialize()
-
-  // Sets object size, pointer size based on architecture and CompressedOops settings
-  // from the JVM.
-  private def initialize() {
-    is64bit = System.getProperty("os.arch").contains("64")
-    isCompressedOops = getIsCompressedOops
-
-    objectSize = if (!is64bit) 8 else {
-      if(!isCompressedOops) {
-        16
-      } else {
-        12
-      }
-    }
-    pointerSize = if (is64bit && !isCompressedOops) 8 else 4
-    classInfos.clear()
-    classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
-  }
-
-  private def getIsCompressedOops : Boolean = {
-    if (System.getProperty("spark.test.useCompressedOops") != null) {
-      return System.getProperty("spark.test.useCompressedOops").toBoolean 
-    }
-
-    try {
-      val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
-      val server = ManagementFactory.getPlatformMBeanServer()
-
-      // NOTE: This should throw an exception in non-Sun JVMs
-      val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
-      val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
-          Class.forName("java.lang.String"))
-
-      val bean = ManagementFactory.newPlatformMXBeanProxy(server, 
-        hotSpotMBeanName, hotSpotMBeanClass)
-      // TODO: We could use reflection on the VMOption returned ?
-      return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
-    } catch {
-      case e: Exception => {
-        // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
-        val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
-        val guessInWords = if (guess) "yes" else "not"
-        logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
-        return guess
-      }
-    }
-  }
-
-  /**
-   * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
-   * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects
-   * to visit.
-   */
-  private class SearchState(val visited: IdentityHashMap[AnyRef, AnyRef]) {
-    val stack = new ArrayBuffer[AnyRef]
-    var size = 0L
-
-    def enqueue(obj: AnyRef) {
-      if (obj != null && !visited.containsKey(obj)) {
-        visited.put(obj, null)
-        stack += obj
-      }
-    }
-
-    def isFinished(): Boolean = stack.isEmpty
-
-    def dequeue(): AnyRef = {
-      val elem = stack.last
-      stack.trimEnd(1)
-      return elem
-    }
-  }
-
-  /**
-   * Cached information about each class. We remember two things: the "shell size" of the class
-   * (size of all non-static fields plus the java.lang.Object size), and any fields that are
-   * pointers to objects.
-   */
-  private class ClassInfo(
-    val shellSize: Long,
-    val pointerFields: List[Field]) {}
-
-  def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
-
-  private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
-    val state = new SearchState(visited)
-    state.enqueue(obj)
-    while (!state.isFinished) {
-      visitSingleObject(state.dequeue(), state)
-    }
-    return state.size
-  }
-
-  private def visitSingleObject(obj: AnyRef, state: SearchState) {
-    val cls = obj.getClass
-    if (cls.isArray) {
-      visitArray(obj, cls, state)
-    } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
-      // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
-      // the size estimator since it references the whole REPL. Do nothing in this case. In
-      // general all ClassLoaders and Classes will be shared between objects anyway.
-    } else {
-      val classInfo = getClassInfo(cls)
-      state.size += classInfo.shellSize
-      for (field <- classInfo.pointerFields) {
-        state.enqueue(field.get(obj))
-      }
-    }
-  }
-
-  // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
-  private val ARRAY_SIZE_FOR_SAMPLING = 200
-  private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
-
-  private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
-    val length = JArray.getLength(array)
-    val elementClass = cls.getComponentType
-
-    // Arrays have object header and length field which is an integer
-    var arrSize: Long = alignSize(objectSize + INT_SIZE)
-
-    if (elementClass.isPrimitive) {
-      arrSize += alignSize(length * primitiveSize(elementClass))
-      state.size += arrSize
-    } else {
-      arrSize += alignSize(length * pointerSize)
-      state.size += arrSize
-
-      if (length <= ARRAY_SIZE_FOR_SAMPLING) {
-        for (i <- 0 until length) {
-          state.enqueue(JArray.get(array, i))
-        }
-      } else {
-        // Estimate the size of a large array by sampling elements without replacement.
-        var size = 0.0
-        val rand = new Random(42)
-        val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
-        for (i <- 0 until ARRAY_SAMPLE_SIZE) {
-          var index = 0
-          do {
-            index = rand.nextInt(length)
-          } while (drawn.contains(index))
-          drawn.add(index)
-          val elem = JArray.get(array, index)
-          size += SizeEstimator.estimate(elem, state.visited)
-        }
-        state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
-      }
-    }
-  }
-
-  private def primitiveSize(cls: Class[_]): Long = {
-    if (cls == classOf[Byte])
-      BYTE_SIZE
-    else if (cls == classOf[Boolean])
-      BOOLEAN_SIZE
-    else if (cls == classOf[Char])
-      CHAR_SIZE
-    else if (cls == classOf[Short])
-      SHORT_SIZE
-    else if (cls == classOf[Int])
-      INT_SIZE
-    else if (cls == classOf[Long])
-      LONG_SIZE
-    else if (cls == classOf[Float])
-      FLOAT_SIZE
-    else if (cls == classOf[Double])
-      DOUBLE_SIZE
-    else throw new IllegalArgumentException(
-      "Non-primitive class " + cls + " passed to primitiveSize()")
-  }
-
-  /**
-   * Get or compute the ClassInfo for a given class.
-   */
-  private def getClassInfo(cls: Class[_]): ClassInfo = {
-    // Check whether we've already cached a ClassInfo for this class
-    val info = classInfos.get(cls)
-    if (info != null) {
-      return info
-    }
-    
-    val parent = getClassInfo(cls.getSuperclass)
-    var shellSize = parent.shellSize
-    var pointerFields = parent.pointerFields
-
-    for (field <- cls.getDeclaredFields) {
-      if (!Modifier.isStatic(field.getModifiers)) {
-        val fieldClass = field.getType
-        if (fieldClass.isPrimitive) {
-          shellSize += primitiveSize(fieldClass)
-        } else {
-          field.setAccessible(true) // Enable future get()'s on this field
-          shellSize += pointerSize
-          pointerFields = field :: pointerFields
-        }
-      }
-    }
-
-    shellSize = alignSize(shellSize)
-
-    // Create and cache a new ClassInfo
-    val newInfo = new ClassInfo(shellSize, pointerFields)
-    classInfos.put(cls, newInfo)
-    return newInfo
-  }
-
-  private def alignSize(size: Long): Long = {
-    val rem = size % ALIGN_SIZE
-    return if (rem == 0) size else (size + ALIGN_SIZE - rem)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
deleted file mode 100644
index 7ce9505..0000000
--- a/core/src/main/scala/spark/SparkContext.scala
+++ /dev/null
@@ -1,995 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.net.URI
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.Map
-import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
-import org.apache.mesos.MesosNativeLibrary
-
-import spark.deploy.LocalSparkCluster
-import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
-  OrderedRDDFunctions}
-import spark.scheduler._
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
-  ClusterScheduler, Schedulable, SchedulingMode}
-import spark.scheduler.local.LocalScheduler
-import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
-import spark.ui.SparkUI
-import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import scala.Some
-import spark.scheduler.StageInfo
-import spark.storage.RDDInfo
-import spark.storage.StorageStatus
-
-/**
- * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
- * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
- *
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- *             system or HDFS, HTTP, HTTPS, or FTP URLs.
- * @param environment Environment variables to set on worker nodes.
- */
-class SparkContext(
-    val master: String,
-    val appName: String,
-    val sparkHome: String = null,
-    val jars: Seq[String] = Nil,
-    val environment: Map[String, String] = Map(),
-    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
-    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
-    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
-  extends Logging {
-
-  // Ensure logging is initialized before we spawn any threads
-  initLogging()
-
-  // Set Spark driver host and port system properties
-  if (System.getProperty("spark.driver.host") == null) {
-    System.setProperty("spark.driver.host", Utils.localHostName())
-  }
-  if (System.getProperty("spark.driver.port") == null) {
-    System.setProperty("spark.driver.port", "0")
-  }
-
-  val isLocal = (master == "local" || master.startsWith("local["))
-
-  // Create the Spark execution environment (cache, map output tracker, etc)
-  private[spark] val env = SparkEnv.createFromSystemProperties(
-    "<driver>",
-    System.getProperty("spark.driver.host"),
-    System.getProperty("spark.driver.port").toInt,
-    true,
-    isLocal)
-  SparkEnv.set(env)
-
-  // Used to store a URL for each static file/jar together with the file's local timestamp
-  private[spark] val addedFiles = HashMap[String, Long]()
-  private[spark] val addedJars = HashMap[String, Long]()
-
-  // Keeps track of all persisted RDDs
-  private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
-  private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
-
-  // Initalize the Spark UI
-  private[spark] val ui = new SparkUI(this)
-  ui.bind()
-
-  val startTime = System.currentTimeMillis()
-
-  // Add each JAR given through the constructor
-  if (jars != null) {
-    jars.foreach { addJar(_) }
-  }
-
-  // Environment variables to pass to our executors
-  private[spark] val executorEnvs = HashMap[String, String]()
-  // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
-  for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
-    val value = System.getenv(key)
-    if (value != null) {
-      executorEnvs(key) = value
-    }
-  }
-  // Since memory can be set with a system property too, use that
-  executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
-  if (environment != null) {
-    executorEnvs ++= environment
-  }
-
-  // Create and start the scheduler
-  private var taskScheduler: TaskScheduler = {
-    // Regular expression used for local[N] master format
-    val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
-    // Regular expression for local[N, maxRetries], used in tests with failing tasks
-    val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
-    // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
-    val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
-    // Regular expression for connecting to Spark deploy clusters
-    val SPARK_REGEX = """(spark://.*)""".r
-    //Regular expression for connection to Mesos cluster
-    val MESOS_REGEX = """(mesos://.*)""".r
-
-    master match {
-      case "local" =>
-        new LocalScheduler(1, 0, this)
-
-      case LOCAL_N_REGEX(threads) =>
-        new LocalScheduler(threads.toInt, 0, this)
-
-      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
-        new LocalScheduler(threads.toInt, maxFailures.toInt, this)
-
-      case SPARK_REGEX(sparkUrl) =>
-        val scheduler = new ClusterScheduler(this)
-        val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
-        scheduler.initialize(backend)
-        scheduler
-
-      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
-        // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
-        val memoryPerSlaveInt = memoryPerSlave.toInt
-        if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
-          throw new SparkException(
-            "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
-              memoryPerSlaveInt, SparkContext.executorMemoryRequested))
-        }
-
-        val scheduler = new ClusterScheduler(this)
-        val localCluster = new LocalSparkCluster(
-          numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
-        val sparkUrl = localCluster.start()
-        val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
-        scheduler.initialize(backend)
-        backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
-          localCluster.stop()
-        }
-        scheduler
-
-      case "yarn-standalone" =>
-        val scheduler = try {
-          val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler")
-          val cons = clazz.getConstructor(classOf[SparkContext])
-          cons.newInstance(this).asInstanceOf[ClusterScheduler]
-        } catch {
-          // TODO: Enumerate the exact reasons why it can fail
-          // But irrespective of it, it means we cannot proceed !
-          case th: Throwable => {
-            throw new SparkException("YARN mode not available ?", th)
-          }
-        }
-        val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
-        scheduler.initialize(backend)
-        scheduler
-
-      case _ =>
-        if (MESOS_REGEX.findFirstIn(master).isEmpty) {
-          logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))
-        }
-        MesosNativeLibrary.load()
-        val scheduler = new ClusterScheduler(this)
-        val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
-        val masterWithoutProtocol = master.replaceFirst("^mesos://", "")  // Strip initial mesos://
-        val backend = if (coarseGrained) {
-          new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
-        } else {
-          new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
-        }
-        scheduler.initialize(backend)
-        scheduler
-    }
-  }
-  taskScheduler.start()
-
-  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
-  dagScheduler.start()
-
-  ui.start()
-
-  /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
-  val hadoopConfiguration = {
-    val env = SparkEnv.get
-    val conf = env.hadoop.newConfiguration()
-    // Explicitly check for S3 environment variables
-    if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
-      conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
-      conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-      conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
-    }
-    // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
-    for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
-      conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
-    }
-    val bufferSize = System.getProperty("spark.buffer.size", "65536")
-    conf.set("io.file.buffer.size", bufferSize)
-    conf
-  }
-
-  private[spark] var checkpointDir: Option[String] = None
-
-  // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new DynamicVariable[Properties](null)
-
-  def initLocalProperties() {
-      localProperties.value = new Properties()
-  }
-
-  def setLocalProperty(key: String, value: String) {
-    if (localProperties.value == null) {
-      localProperties.value = new Properties()
-    }
-    if (value == null) {
-      localProperties.value.remove(key)
-    } else {
-      localProperties.value.setProperty(key, value)
-    }
-  }
-
-  /** Set a human readable description of the current job. */
-  def setJobDescription(value: String) {
-    setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
-  }
-
-  // Post init
-  taskScheduler.postStartHook()
-
-  val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
-  val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-
-  def initDriverMetrics() {
-    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
-    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
-  }
-
-  initDriverMetrics()
-
-  // Methods for creating RDDs
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
-    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
-  }
-
-  /** Distribute a local Scala collection to form an RDD. */
-  def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
-    parallelize(seq, numSlices)
-  }
-
-  /** Distribute a local Scala collection to form an RDD, with one or more
-    * location preferences (hostnames of Spark nodes) for each object.
-    * Create a new partition for each collection item. */
-   def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
-    val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
-    new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
-  }
-
-  /**
-   * Read a text file from HDFS, a local file system (available on all nodes), or any
-   * Hadoop-supported file system URI, and return it as an RDD of Strings.
-   */
-  def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
-    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
-      .map(pair => pair._2.toString)
-  }
-
-  /**
-   * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
-   * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
-   * etc).
-   */
-  def hadoopRDD[K, V](
-      conf: JobConf,
-      inputFormatClass: Class[_ <: InputFormat[K, V]],
-      keyClass: Class[K],
-      valueClass: Class[V],
-      minSplits: Int = defaultMinSplits
-      ): RDD[(K, V)] = {
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
-  }
-
-  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
-  def hadoopFile[K, V](
-      path: String,
-      inputFormatClass: Class[_ <: InputFormat[K, V]],
-      keyClass: Class[K],
-      valueClass: Class[V],
-      minSplits: Int = defaultMinSplits
-      ) : RDD[(K, V)] = {
-    val conf = new JobConf(hadoopConfiguration)
-    FileInputFormat.setInputPaths(conf, path)
-    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
-  }
-
-  /**
-   * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
-   * values and the InputFormat so that users don't need to pass them directly. Instead, callers
-   * can just write, for example,
-   * {{{
-   * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
-   * }}}
-   */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
-      (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
-      : RDD[(K, V)] = {
-    hadoopFile(path,
-        fm.erasure.asInstanceOf[Class[F]],
-        km.erasure.asInstanceOf[Class[K]],
-        vm.erasure.asInstanceOf[Class[V]],
-        minSplits)
-  }
-
-  /**
-   * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
-   * values and the InputFormat so that users don't need to pass them directly. Instead, callers
-   * can just write, for example,
-   * {{{
-   * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
-   * }}}
-   */
-  def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
-      (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
-    hadoopFile[K, V, F](path, defaultMinSplits)
-
-  /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
-      (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
-    newAPIHadoopFile(
-        path,
-        fm.erasure.asInstanceOf[Class[F]],
-        km.erasure.asInstanceOf[Class[K]],
-        vm.erasure.asInstanceOf[Class[V]])
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
-      path: String,
-      fClass: Class[F],
-      kClass: Class[K],
-      vClass: Class[V],
-      conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
-    val job = new NewHadoopJob(conf)
-    NewFileInputFormat.addInputPath(job, new Path(path))
-    val updatedConf = job.getConfiguration
-    new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
-  }
-
-  /**
-   * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
-   * and extra configuration options to pass to the input format.
-   */
-  def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
-      conf: Configuration = hadoopConfiguration,
-      fClass: Class[F],
-      kClass: Class[K],
-      vClass: Class[V]): RDD[(K, V)] = {
-    new NewHadoopRDD(this, fClass, kClass, vClass, conf)
-  }
-
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String,
-      keyClass: Class[K],
-      valueClass: Class[V],
-      minSplits: Int
-      ): RDD[(K, V)] = {
-    val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
-    hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
-  }
-
-  /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
-  def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
-    sequenceFile(path, keyClass, valueClass, defaultMinSplits)
-
-  /**
-   * Version of sequenceFile() for types implicitly convertible to Writables through a
-   * WritableConverter. For example, to access a SequenceFile where the keys are Text and the
-   * values are IntWritable, you could simply write
-   * {{{
-   * sparkContext.sequenceFile[String, Int](path, ...)
-   * }}}
-   *
-   * WritableConverters are provided in a somewhat strange way (by an implicit function) to support
-   * both subclasses of Writable and types for which we define a converter (e.g. Int to
-   * IntWritable). The most natural thing would've been to have implicit objects for the
-   * converters, but then we couldn't have an object for every subclass of Writable (you can't
-   * have a parameterized singleton object). We use functions instead to create a new converter
-   * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
-   * allow it to figure out the Writable class to use in the subclass case.
-   */
-   def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
-      (implicit km: ClassManifest[K], vm: ClassManifest[V],
-          kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
-      : RDD[(K, V)] = {
-    val kc = kcf()
-    val vc = vcf()
-    val format = classOf[SequenceFileInputFormat[Writable, Writable]]
-    val writables = hadoopFile(path, format,
-        kc.writableClass(km).asInstanceOf[Class[Writable]],
-        vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
-    writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
-  }
-
-  /**
-   * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
-   * BytesWritable values that contain a serialized partition. This is still an experimental storage
-   * format and may not be supported exactly as is in future Spark releases. It will also be pretty
-   * slow if you use the default serializer (Java serialization), though the nice thing about it is
-   * that there's very little effort required to save arbitrary objects.
-   */
-  def objectFile[T: ClassManifest](
-      path: String,
-      minSplits: Int = defaultMinSplits
-      ): RDD[T] = {
-    sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
-      .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
-  }
-
-
-  protected[spark] def checkpointFile[T: ClassManifest](
-      path: String
-    ): RDD[T] = {
-    new CheckpointRDD[T](this, path)
-  }
-
-  /** Build the union of a list of RDDs. */
-  def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
-
-  /** Build the union of a list of RDDs passed as variable-length arguments. */
-  def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
-    new UnionRDD(this, Seq(first) ++ rest)
-
-  // Methods for creating shared variables
-
-  /**
-   * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
-   * to using the `+=` method. Only the driver can access the accumulator's `value`.
-   */
-  def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
-    new Accumulator(initialValue, param)
-
-  /**
-   * Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
-   * Only the driver can access the accumuable's `value`.
-   * @tparam T accumulator type
-   * @tparam R type that can be added to the accumulator
-   */
-  def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
-    new Accumulable(initialValue, param)
-
-  /**
-   * Create an accumulator from a "mutable collection" type.
-   *
-   * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
-   * standard mutable collections. So you can use this with mutable Map, Set, etc.
-   */
-  def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
-    val param = new GrowableAccumulableParam[R,T]
-    new Accumulable(initialValue, param)
-  }
-
-  /**
-   * Broadcast a read-only variable to the cluster, returning a [[spark.broadcast.Broadcast]] object for
-   * reading it in distributed functions. The variable will be sent to each cluster only once.
-   */
-  def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
-
-  /**
-   * Add a file to be downloaded with this Spark job on every node.
-   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.  To access the file in Spark jobs,
-   * use `SparkFiles.get(path)` to find its download location.
-   */
-  def addFile(path: String) {
-    val uri = new URI(path)
-    val key = uri.getScheme match {
-      case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
-      case _ => path
-    }
-    addedFiles(key) = System.currentTimeMillis
-
-    // Fetch the file locally in case a job is executed locally.
-    // Jobs that run through LocalScheduler will already fetch the required dependencies,
-    // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
-    Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
-
-    logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
-  }
-
-  def addSparkListener(listener: SparkListener) {
-    dagScheduler.addSparkListener(listener)
-  }
-
-  /**
-   * Return a map from the slave to the max memory available for caching and the remaining
-   * memory available for caching.
-   */
-  def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
-    env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
-      (blockManagerId.host + ":" + blockManagerId.port, mem)
-    }
-  }
-
-  /**
-   * Return information about what RDDs are cached, if they are in mem or on disk, how much space
-   * they take, etc.
-   */
-  def getRDDStorageInfo: Array[RDDInfo] = {
-    StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
-  }
-
-  /**
-   * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
-   * Note that this does not necessarily mean the caching or computation was successful.
-   */
-  def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
-
-  def getStageInfo: Map[Stage,StageInfo] = {
-    dagScheduler.stageToInfos
-  }
-
-  /**
-   * Return information about blocks stored in all of the slaves
-   */
-  def getExecutorStorageStatus: Array[StorageStatus] = {
-    env.blockManager.master.getStorageStatus
-  }
-
-  /**
-   *  Return pools for fair scheduler
-   *  TODO(xiajunluan): We should take nested pools into account
-   */
-  def getAllPools: ArrayBuffer[Schedulable] = {
-    taskScheduler.rootPool.schedulableQueue
-  }
-
-  /**
-   * Return the pool associated with the given name, if one exists
-   */
-  def getPoolForName(pool: String): Option[Schedulable] = {
-    taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
-  }
-
-  /**
-   *  Return current scheduling mode
-   */
-  def getSchedulingMode: SchedulingMode.SchedulingMode = {
-    taskScheduler.schedulingMode
-  }
-
-  /**
-   * Clear the job's list of files added by `addFile` so that they do not get downloaded to
-   * any new nodes.
-   */
-  def clearFiles() {
-    addedFiles.clear()
-  }
-
-  /**
-   * Gets the locality information associated with the partition in a particular rdd
-   * @param rdd of interest
-   * @param partition to be looked up for locality
-   * @return list of preferred locations for the partition
-   */
-  private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
-    dagScheduler.getPreferredLocs(rdd, partition)
-  }
-
-  /**
-   * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
-   * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
-   * filesystems), or an HTTP, HTTPS or FTP URI.
-   */
-  def addJar(path: String) {
-    if (null == path) {
-      logWarning("null specified as parameter to addJar",
-        new SparkException("null specified as parameter to addJar"))
-    } else {
-      val env = SparkEnv.get
-      val uri = new URI(path)
-      val key = uri.getScheme match {
-        case null | "file" =>
-          if (env.hadoop.isYarnMode()) {
-            logWarning("local jar specified as parameter to addJar under Yarn mode")
-            return
-          }
-          env.httpFileServer.addJar(new File(uri.getPath))
-        case _ => path
-      }
-      addedJars(key) = System.currentTimeMillis
-      logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
-    }
-  }
-
-  /**
-   * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
-   * any new nodes.
-   */
-  def clearJars() {
-    addedJars.clear()
-  }
-
-  /** Shut down the SparkContext. */
-  def stop() {
-    ui.stop()
-    // Do this only if not stopped already - best case effort.
-    // prevent NPE if stopped more than once.
-    val dagSchedulerCopy = dagScheduler
-    dagScheduler = null
-    if (dagSchedulerCopy != null) {
-      metadataCleaner.cancel()
-      dagSchedulerCopy.stop()
-      taskScheduler = null
-      // TODO: Cache.stop()?
-      env.stop()
-      // Clean up locally linked files
-      clearFiles()
-      clearJars()
-      SparkEnv.set(null)
-      ShuffleMapTask.clearCache()
-      ResultTask.clearCache()
-      logInfo("Successfully stopped SparkContext")
-    } else {
-      logInfo("SparkContext already stopped")
-    }
-  }
-
-
-  /**
-   * Get Spark's home location from either a value set through the constructor,
-   * or the spark.home Java property, or the SPARK_HOME environment variable
-   * (in that order of preference). If neither of these is set, return None.
-   */
-  private[spark] def getSparkHome(): Option[String] = {
-    if (sparkHome != null) {
-      Some(sparkHome)
-    } else if (System.getProperty("spark.home") != null) {
-      Some(System.getProperty("spark.home"))
-    } else if (System.getenv("SPARK_HOME") != null) {
-      Some(System.getenv("SPARK_HOME"))
-    } else {
-      None
-    }
-  }
-
-  /**
-   * Run a function on a given set of partitions in an RDD and pass the results to the given
-   * handler function. This is the main entry point for all actions in Spark. The allowLocal
-   * flag specifies whether the scheduler can run the computation on the driver rather than
-   * shipping it out to the cluster, for short actions like first().
-   */
-  def runJob[T, U: ClassManifest](
-      rdd: RDD[T],
-      func: (TaskContext, Iterator[T]) => U,
-      partitions: Seq[Int],
-      allowLocal: Boolean,
-      resultHandler: (Int, U) => Unit) {
-    val callSite = Utils.formatSparkCallSite
-    logInfo("Starting job: " + callSite)
-    val start = System.nanoTime
-    val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
-    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
-    rdd.doCheckpoint()
-    result
-  }
-
-  /**
-   * Run a function on a given set of partitions in an RDD and return the results as an array. The
-   * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
-   * than shipping it out to the cluster, for short actions like first().
-   */
-  def runJob[T, U: ClassManifest](
-      rdd: RDD[T],
-      func: (TaskContext, Iterator[T]) => U,
-      partitions: Seq[Int],
-      allowLocal: Boolean
-      ): Array[U] = {
-    val results = new Array[U](partitions.size)
-    runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
-    results
-  }
-
-  /**
-   * Run a job on a given set of partitions of an RDD, but take a function of type
-   * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
-   */
-  def runJob[T, U: ClassManifest](
-      rdd: RDD[T],
-      func: Iterator[T] => U,
-      partitions: Seq[Int],
-      allowLocal: Boolean
-      ): Array[U] = {
-    runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
-  }
-
-  /**
-   * Run a job on all partitions in an RDD and return the results in an array.
-   */
-  def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
-    runJob(rdd, func, 0 until rdd.partitions.size, false)
-  }
-
-  /**
-   * Run a job on all partitions in an RDD and return the results in an array.
-   */
-  def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
-    runJob(rdd, func, 0 until rdd.partitions.size, false)
-  }
-
-  /**
-   * Run a job on all partitions in an RDD and pass the results to a handler function.
-   */
-  def runJob[T, U: ClassManifest](
-    rdd: RDD[T],
-    processPartition: (TaskContext, Iterator[T]) => U,
-    resultHandler: (Int, U) => Unit)
-  {
-    runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
-  }
-
-  /**
-   * Run a job on all partitions in an RDD and pass the results to a handler function.
-   */
-  def runJob[T, U: ClassManifest](
-      rdd: RDD[T],
-      processPartition: Iterator[T] => U,
-      resultHandler: (Int, U) => Unit)
-  {
-    val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
-    runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
-  }
-
-  /**
-   * Run a job that can return approximate results.
-   */
-  def runApproximateJob[T, U, R](
-      rdd: RDD[T],
-      func: (TaskContext, Iterator[T]) => U,
-      evaluator: ApproximateEvaluator[U, R],
-      timeout: Long): PartialResult[R] = {
-    val callSite = Utils.formatSparkCallSite
-    logInfo("Starting job: " + callSite)
-    val start = System.nanoTime
-    val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
-    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
-    result
-  }
-
-  /**
-   * Clean a closure to make it ready to serialized and send to tasks
-   * (removes unreferenced variables in $outer's, updates REPL variables)
-   */
-  private[spark] def clean[F <: AnyRef](f: F): F = {
-    ClosureCleaner.clean(f)
-    return f
-  }
-
-  /**
-   * Set the directory under which RDDs are going to be checkpointed. The directory must
-   * be a HDFS path if running on a cluster. If the directory does not exist, it will
-   * be created. If the directory exists and useExisting is set to true, then the
-   * exisiting directory will be used. Otherwise an exception will be thrown to
-   * prevent accidental overriding of checkpoint files in the existing directory.
-   */
-  def setCheckpointDir(dir: String, useExisting: Boolean = false) {
-    val env = SparkEnv.get
-    val path = new Path(dir)
-    val fs = path.getFileSystem(env.hadoop.newConfiguration())
-    if (!useExisting) {
-      if (fs.exists(path)) {
-        throw new Exception("Checkpoint directory '" + path + "' already exists.")
-      } else {
-        fs.mkdirs(path)
-      }
-    }
-    checkpointDir = Some(dir)
-  }
-
-  /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
-  def defaultParallelism: Int = taskScheduler.defaultParallelism
-
-  /** Default min number of partitions for Hadoop RDDs when not given by user */
-  def defaultMinSplits: Int = math.min(defaultParallelism, 2)
-
-  private val nextShuffleId = new AtomicInteger(0)
-
-  private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
-
-  private val nextRddId = new AtomicInteger(0)
-
-  /** Register a new RDD, returning its RDD ID */
-  private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
-
-  /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
-  private[spark] def cleanup(cleanupTime: Long) {
-    persistentRdds.clearOldValues(cleanupTime)
-  }
-}
-
-/**
- * The SparkContext object contains a number of implicit conversions and parameters for use with
- * various Spark features.
- */
-object SparkContext {
-  val SPARK_JOB_DESCRIPTION = "spark.job.description"
-
-  implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
-    def addInPlace(t1: Double, t2: Double): Double = t1 + t2
-    def zero(initialValue: Double) = 0.0
-  }
-
-  implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
-    def addInPlace(t1: Int, t2: Int): Int = t1 + t2
-    def zero(initialValue: Int) = 0
-  }
-
-  implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
-    def addInPlace(t1: Long, t2: Long) = t1 + t2
-    def zero(initialValue: Long) = 0l
-  }
-
-  implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
-    def addInPlace(t1: Float, t2: Float) = t1 + t2
-    def zero(initialValue: Float) = 0f
-  }
-
-  // TODO: Add AccumulatorParams for other types, e.g. lists and strings
-
-  implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
-    new PairRDDFunctions(rdd)
-
-  implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
-      rdd: RDD[(K, V)]) =
-    new SequenceFileRDDFunctions(rdd)
-
-  implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
-      rdd: RDD[(K, V)]) =
-    new OrderedRDDFunctions[K, V, (K, V)](rdd)
-
-  implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
-
-  implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
-    new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
-
-  // Implicit conversions to common Writable types, for saveAsSequenceFile
-
-  implicit def intToIntWritable(i: Int) = new IntWritable(i)
-
-  implicit def longToLongWritable(l: Long) = new LongWritable(l)
-
-  implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
-
-  implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
-
-  implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
-
-  implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)
-
-  implicit def stringToText(s: String) = new Text(s)
-
-  private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
-    def anyToWritable[U <% Writable](u: U): Writable = u
-
-    new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
-        arr.map(x => anyToWritable(x)).toArray)
-  }
-
-  // Helper objects for converting common types to Writable
-  private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
-    val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
-    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
-  }
-
-  implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
-
-  implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
-
-  implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
-
-  implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
-
-  implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
-
-  implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
-
-  implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
-
-  implicit def writableWritableConverter[T <: Writable]() =
-    new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
-
-  /**
-   * Find the JAR from which a given class was loaded, to make it easy for users to pass
-   * their JARs to SparkContext
-   */
-  def jarOfClass(cls: Class[_]): Seq[String] = {
-    val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
-    if (uri != null) {
-      val uriStr = uri.toString
-      if (uriStr.startsWith("jar:file:")) {
-        // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar
-        List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
-      } else {
-        Nil
-      }
-    } else {
-      Nil
-    }
-  }
-
-  /** Find the JAR that contains the class of a particular object */
-  def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
-
-  /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
-  private[spark] val executorMemoryRequested = {
-    // TODO: Might need to add some extra memory for the non-heap parts of the JVM
-    Option(System.getProperty("spark.executor.memory"))
-      .orElse(Option(System.getenv("SPARK_MEM")))
-      .map(Utils.memoryStringToMb)
-      .getOrElse(512)
-  }
-}
-
-/**
- * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
- * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
- * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
- * that doesn't know the type of T when it is created. This sounds strange but is necessary to
- * support converting subclasses of Writable to themselves (writableWritableConverter).
- */
-private[spark] class WritableConverter[T](
-    val writableClass: ClassManifest[T] => Class[_ <: Writable],
-    val convert: Writable => T)
-  extends Serializable
-

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
deleted file mode 100644
index 1f66e9c..0000000
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import collection.mutable
-import serializer.Serializer
-
-import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
-import akka.remote.RemoteActorRefProvider
-
-import spark.broadcast.BroadcastManager
-import spark.metrics.MetricsSystem
-import spark.deploy.SparkHadoopUtil
-import spark.storage.BlockManager
-import spark.storage.BlockManagerMaster
-import spark.network.ConnectionManager
-import spark.serializer.{Serializer, SerializerManager}
-import spark.util.AkkaUtils
-import spark.api.python.PythonWorkerFactory
-
-
-/**
- * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
- * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
- * objects needs to have the right SparkEnv set. You can get the current environment with
- * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
- */
-class SparkEnv (
-    val executorId: String,
-    val actorSystem: ActorSystem,
-    val serializerManager: SerializerManager,
-    val serializer: Serializer,
-    val closureSerializer: Serializer,
-    val cacheManager: CacheManager,
-    val mapOutputTracker: MapOutputTracker,
-    val shuffleFetcher: ShuffleFetcher,
-    val broadcastManager: BroadcastManager,
-    val blockManager: BlockManager,
-    val connectionManager: ConnectionManager,
-    val httpFileServer: HttpFileServer,
-    val sparkFilesDir: String,
-    val metricsSystem: MetricsSystem) {
-
-  private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
-
-  val hadoop = {
-    val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
-    if(yarnMode) {
-      try {
-        Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
-      } catch {
-        case th: Throwable => throw new SparkException("Unable to load YARN support", th)
-      }
-    } else {
-      new SparkHadoopUtil
-    }
-  }
-
-  def stop() {
-    pythonWorkers.foreach { case(key, worker) => worker.stop() }
-    httpFileServer.stop()
-    mapOutputTracker.stop()
-    shuffleFetcher.stop()
-    broadcastManager.stop()
-    blockManager.stop()
-    blockManager.master.stop()
-    metricsSystem.stop()
-    actorSystem.shutdown()
-    // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
-    // down, but let's call it anyway in case it gets fixed in a later release
-    actorSystem.awaitTermination()
-  }
-
-  def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
-    synchronized {
-      val key = (pythonExec, envVars)
-      pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
-    }
-  }
-}
-
-object SparkEnv extends Logging {
-  private val env = new ThreadLocal[SparkEnv]
-  @volatile private var lastSetSparkEnv : SparkEnv = _
-
-  def set(e: SparkEnv) {
-	  lastSetSparkEnv = e
-    env.set(e)
-  }
-
-  /**
-   * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
-   * previously set in any thread.
-   */
-  def get: SparkEnv = {
-    Option(env.get()).getOrElse(lastSetSparkEnv)
-  }
-
-  /**
-   * Returns the ThreadLocal SparkEnv.
-   */
-  def getThreadLocal : SparkEnv = {
-	  env.get()
-  }
-
-  def createFromSystemProperties(
-      executorId: String,
-      hostname: String,
-      port: Int,
-      isDriver: Boolean,
-      isLocal: Boolean): SparkEnv = {
-
-    val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
-
-    // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
-    // figure out which port number Akka actually bound to and set spark.driver.port to it.
-    if (isDriver && port == 0) {
-      System.setProperty("spark.driver.port", boundPort.toString)
-    }
-
-    // set only if unset until now.
-    if (System.getProperty("spark.hostPort", null) == null) {
-      if (!isDriver){
-        // unexpected
-        Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
-      }
-      Utils.checkHost(hostname)
-      System.setProperty("spark.hostPort", hostname + ":" + boundPort)
-    }
-
-    val classLoader = Thread.currentThread.getContextClassLoader
-
-    // Create an instance of the class named by the given Java system property, or by
-    // defaultClassName if the property is not set, and return it as a T
-    def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
-      val name = System.getProperty(propertyName, defaultClassName)
-      Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
-    }
-
-    val serializerManager = new SerializerManager
-
-    val serializer = serializerManager.setDefault(
-      System.getProperty("spark.serializer", "spark.JavaSerializer"))
-
-    val closureSerializer = serializerManager.get(
-      System.getProperty("spark.closure.serializer", "spark.JavaSerializer"))
-
-    def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
-      if (isDriver) {
-        logInfo("Registering " + name)
-        actorSystem.actorOf(Props(newActor), name = name)
-      } else {
-        val driverHost: String = System.getProperty("spark.driver.host", "localhost")
-        val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
-        Utils.checkHost(driverHost, "Expected hostname")
-        val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
-        logInfo("Connecting to " + name + ": " + url)
-        actorSystem.actorFor(url)
-      }
-    }
-
-    val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
-      "BlockManagerMaster",
-      new spark.storage.BlockManagerMasterActor(isLocal)))
-    val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
-
-    val connectionManager = blockManager.connectionManager
-
-    val broadcastManager = new BroadcastManager(isDriver)
-
-    val cacheManager = new CacheManager(blockManager)
-
-    // Have to assign trackerActor after initialization as MapOutputTrackerActor
-    // requires the MapOutputTracker itself
-    val mapOutputTracker = new MapOutputTracker()
-    mapOutputTracker.trackerActor = registerOrLookup(
-      "MapOutputTracker",
-      new MapOutputTrackerActor(mapOutputTracker))
-
-    val shuffleFetcher = instantiateClass[ShuffleFetcher](
-      "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-
-    val httpFileServer = new HttpFileServer()
-    httpFileServer.initialize()
-    System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
-
-    val metricsSystem = if (isDriver) {
-      MetricsSystem.createMetricsSystem("driver")
-    } else {
-      MetricsSystem.createMetricsSystem("executor")
-    }
-    metricsSystem.start()
-
-    // Set the sparkFiles directory, used when downloading dependencies.  In local mode,
-    // this is a temporary directory; in distributed mode, this is the executor's current working
-    // directory.
-    val sparkFilesDir: String = if (isDriver) {
-      Utils.createTempDir().getAbsolutePath
-    } else {
-      "."
-    }
-
-    // Warn about deprecated spark.cache.class property
-    if (System.getProperty("spark.cache.class") != null) {
-      logWarning("The spark.cache.class property is no longer being used! Specify storage " +
-        "levels using the RDD.persist() method instead.")
-    }
-
-    new SparkEnv(
-      executorId,
-      actorSystem,
-      serializerManager,
-      serializer,
-      closureSerializer,
-      cacheManager,
-      mapOutputTracker,
-      shuffleFetcher,
-      broadcastManager,
-      blockManager,
-      connectionManager,
-      httpFileServer,
-      sparkFilesDir,
-      metricsSystem)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkException.scala b/core/src/main/scala/spark/SparkException.scala
deleted file mode 100644
index b7045ee..0000000
--- a/core/src/main/scala/spark/SparkException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-class SparkException(message: String, cause: Throwable)
-  extends Exception(message, cause) {
-
-  def this(message: String) = this(message, null)  
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkFiles.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkFiles.java b/core/src/main/scala/spark/SparkFiles.java
deleted file mode 100644
index f9b3f79..0000000
--- a/core/src/main/scala/spark/SparkFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark;
-
-import java.io.File;
-
-/**
- * Resolves paths to files added through `SparkContext.addFile()`.
- */
-public class SparkFiles {
-
-  private SparkFiles() {}
-
-  /**
-   * Get the absolute path of a file added through `SparkContext.addFile()`.
-   */
-  public static String get(String filename) {
-    return new File(getRootDirectory(), filename).getAbsolutePath();
-  }
-
-  /**
-   * Get the root directory that contains files added through `SparkContext.addFile()`.
-   */
-  public static String getRootDirectory() {
-    return SparkEnv.get().sparkFilesDir();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkHadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala
deleted file mode 100644
index 6b330ef..0000000
--- a/core/src/main/scala/spark/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-
-import java.text.SimpleDateFormat
-import java.text.NumberFormat
-import java.io.IOException
-import java.util.Date
-
-import spark.Logging
-import spark.SerializableWritable
-
-/**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
- * because we need to access this class from the `spark` package to use some package-private Hadoop
- * functions, but this class should not be used directly by users.
- *
- * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
- * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
- */
-class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
-
-  private val now = new Date()
-  private val conf = new SerializableWritable(jobConf)
-  
-  private var jobID = 0
-  private var splitID = 0
-  private var attemptID = 0
-  private var jID: SerializableWritable[JobID] = null
-  private var taID: SerializableWritable[TaskAttemptID] = null
-
-  @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
-  @transient private var format: OutputFormat[AnyRef,AnyRef] = null
-  @transient private var committer: OutputCommitter = null
-  @transient private var jobContext: JobContext = null
-  @transient private var taskContext: TaskAttemptContext = null
-
-  def preSetup() {
-    setIDs(0, 0, 0)
-    setConfParams()
-    
-    val jCtxt = getJobContext() 
-    getOutputCommitter().setupJob(jCtxt)
-  }
-
-
-  def setup(jobid: Int, splitid: Int, attemptid: Int) {
-    setIDs(jobid, splitid, attemptid)
-    setConfParams() 
-  }
-
-  def open() {
-    val numfmt = NumberFormat.getInstance()
-    numfmt.setMinimumIntegerDigits(5)
-    numfmt.setGroupingUsed(false)
-    
-    val outputName = "part-"  + numfmt.format(splitID)
-    val path = FileOutputFormat.getOutputPath(conf.value)
-    val fs: FileSystem = {
-      if (path != null) {
-        path.getFileSystem(conf.value)
-      } else {
-        FileSystem.get(conf.value)
-      }
-    }
-
-    getOutputCommitter().setupTask(getTaskContext()) 
-    writer = getOutputFormat().getRecordWriter(
-        fs, conf.value, outputName, Reporter.NULL)
-  }
-
-  def write(key: AnyRef, value: AnyRef) {
-    if (writer!=null) {
-      //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
-      writer.write(key, value)
-    } else {
-      throw new IOException("Writer is null, open() has not been called")
-    }
-  }
-
-  def close() {
-    writer.close(Reporter.NULL)
-  }
-
-  def commit() {
-    val taCtxt = getTaskContext()
-    val cmtr = getOutputCommitter() 
-    if (cmtr.needsTaskCommit(taCtxt)) {
-      try {
-        cmtr.commitTask(taCtxt)
-        logInfo (taID + ": Committed")
-      } catch {
-        case e: IOException => { 
-          logError("Error committing the output of task: " + taID.value, e)
-          cmtr.abortTask(taCtxt)
-          throw e
-        }
-      }   
-    } else {
-      logWarning ("No need to commit output of task: " + taID.value)
-    }
-  }
-
-  def commitJob() {
-    // always ? Or if cmtr.needsTaskCommit ?
-    val cmtr = getOutputCommitter()
-    cmtr.commitJob(getJobContext())
-  }
-
-  def cleanup() {
-    getOutputCommitter().cleanupJob(getJobContext())
-  }
-
-  // ********* Private Functions *********
-
-  private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
-    if (format == null) {
-      format = conf.value.getOutputFormat()
-        .asInstanceOf[OutputFormat[AnyRef,AnyRef]]
-    }
-    return format 
-  }
-
-  private def getOutputCommitter(): OutputCommitter = {
-    if (committer == null) {
-      committer = conf.value.getOutputCommitter
-    }
-    return committer
-  }
-
-  private def getJobContext(): JobContext = {
-    if (jobContext == null) { 
-      jobContext = newJobContext(conf.value, jID.value)
-    }
-    return jobContext
-  }
-
-  private def getTaskContext(): TaskAttemptContext = {
-    if (taskContext == null) {
-      taskContext =  newTaskAttemptContext(conf.value, taID.value)
-    }
-    return taskContext
-  }
-
-  private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
-    jobID = jobid
-    splitID = splitid
-    attemptID = attemptid
-
-    jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
-    taID = new SerializableWritable[TaskAttemptID](
-        new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
-  }
-
-  private def setConfParams() {
-    conf.value.set("mapred.job.id", jID.value.toString)
-    conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
-    conf.value.set("mapred.task.id", taID.value.toString)
-    conf.value.setBoolean("mapred.task.is.map", true)
-    conf.value.setInt("mapred.task.partition", splitID)
-  }
-}
-
-object SparkHadoopWriter {
-  def createJobID(time: Date, id: Int): JobID = {
-    val formatter = new SimpleDateFormat("yyyyMMddHHmm")
-    val jobtrackerID = formatter.format(new Date())
-    return new JobID(jobtrackerID, id)
-  }
-  
-  def createPathFromString(path: String, conf: JobConf): Path = {
-    if (path == null) {
-      throw new IllegalArgumentException("Output path is null")
-    }
-    var outputPath = new Path(path)
-    val fs = outputPath.getFileSystem(conf)
-    if (outputPath == null || fs == null) {
-      throw new IllegalArgumentException("Incorrectly formatted output path")
-    }
-    outputPath = outputPath.makeQualified(fs)
-    return outputPath
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskContext.scala b/core/src/main/scala/spark/TaskContext.scala
deleted file mode 100644
index b79f4ca..0000000
--- a/core/src/main/scala/spark/TaskContext.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import executor.TaskMetrics
-import scala.collection.mutable.ArrayBuffer
-
-class TaskContext(
-  val stageId: Int,
-  val splitId: Int,
-  val attemptId: Long,
-  val taskMetrics: TaskMetrics = TaskMetrics.empty()
-) extends Serializable {
-
-  @transient val onCompleteCallbacks = new ArrayBuffer[() => Unit]
-
-  // Add a callback function to be executed on task completion. An example use
-  // is for HadoopRDD to register a callback to close the input stream.
-  def addOnCompleteCallback(f: () => Unit) {
-    onCompleteCallbacks += f
-  }
-
-  def executeOnCompleteCallbacks() {
-    onCompleteCallbacks.foreach{_()}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
deleted file mode 100644
index 3ad665d..0000000
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.executor.TaskMetrics
-import spark.storage.BlockManagerId
-
-/**
- * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
- * tasks several times for "ephemeral" failures, and only report back failures that require some
- * old stages to be resubmitted, such as shuffle map fetch failures.
- */
-private[spark] sealed trait TaskEndReason
-
-private[spark] case object Success extends TaskEndReason
-
-private[spark] 
-case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-
-private[spark] case class FetchFailed(
-    bmAddress: BlockManagerId,
-    shuffleId: Int,
-    mapId: Int,
-    reduceId: Int)
-  extends TaskEndReason
-
-private[spark] case class ExceptionFailure(
-    className: String,
-    description: String,
-    stackTrace: Array[StackTraceElement],
-    metrics: Option[TaskMetrics])
-  extends TaskEndReason
-
-private[spark] case class OtherFailure(message: String) extends TaskEndReason
-
-private[spark] case class TaskResultTooBigFailure() extends TaskEndReason

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskState.scala b/core/src/main/scala/spark/TaskState.scala
deleted file mode 100644
index bf75753..0000000
--- a/core/src/main/scala/spark/TaskState.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
-private[spark] object TaskState
-  extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
-
-  val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
-
-  val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
-
-  type TaskState = Value
-
-  def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
-
-  def toMesos(state: TaskState): MesosTaskState = state match {
-    case LAUNCHING => MesosTaskState.TASK_STARTING
-    case RUNNING => MesosTaskState.TASK_RUNNING
-    case FINISHED => MesosTaskState.TASK_FINISHED
-    case FAILED => MesosTaskState.TASK_FAILED
-    case KILLED => MesosTaskState.TASK_KILLED
-    case LOST => MesosTaskState.TASK_LOST
-  }
-
-  def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
-    case MesosTaskState.TASK_STAGING => LAUNCHING
-    case MesosTaskState.TASK_STARTING => LAUNCHING
-    case MesosTaskState.TASK_RUNNING => RUNNING
-    case MesosTaskState.TASK_FINISHED => FINISHED
-    case MesosTaskState.TASK_FAILED => FAILED
-    case MesosTaskState.TASK_KILLED => KILLED
-    case MesosTaskState.TASK_LOST => LOST
-  }
-}