You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/09/01 23:59:11 UTC
[27/69] [abbrv] [partial] Initial work to rename package to
org.apache.spark
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SizeEstimator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SizeEstimator.scala b/core/src/main/scala/spark/SizeEstimator.scala
deleted file mode 100644
index 6cc5756..0000000
--- a/core/src/main/scala/spark/SizeEstimator.scala
+++ /dev/null
@@ -1,283 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.lang.reflect.Field
-import java.lang.reflect.Modifier
-import java.lang.reflect.{Array => JArray}
-import java.util.IdentityHashMap
-import java.util.concurrent.ConcurrentHashMap
-import java.util.Random
-
-import javax.management.MBeanServer
-import java.lang.management.ManagementFactory
-
-import scala.collection.mutable.ArrayBuffer
-
-import it.unimi.dsi.fastutil.ints.IntOpenHashSet
-
-/**
- * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
- * memory-aware caches.
- *
- * Based on the following JavaWorld article:
- * http://www.javaworld.com/javaworld/javaqa/2003-12/02-qa-1226-sizeof.html
- */
-private[spark] object SizeEstimator extends Logging {
-
- // Sizes of primitive types
- private val BYTE_SIZE = 1
- private val BOOLEAN_SIZE = 1
- private val CHAR_SIZE = 2
- private val SHORT_SIZE = 2
- private val INT_SIZE = 4
- private val LONG_SIZE = 8
- private val FLOAT_SIZE = 4
- private val DOUBLE_SIZE = 8
-
- // Alignment boundary for objects
- // TODO: Is this arch dependent ?
- private val ALIGN_SIZE = 8
-
- // A cache of ClassInfo objects for each class
- private val classInfos = new ConcurrentHashMap[Class[_], ClassInfo]
-
- // Object and pointer sizes are arch dependent
- private var is64bit = false
-
- // Size of an object reference
- // Based on https://wikis.oracle.com/display/HotSpotInternals/CompressedOops
- private var isCompressedOops = false
- private var pointerSize = 4
-
- // Minimum size of a java.lang.Object
- private var objectSize = 8
-
- initialize()
-
- // Sets object size, pointer size based on architecture and CompressedOops settings
- // from the JVM.
- private def initialize() {
- is64bit = System.getProperty("os.arch").contains("64")
- isCompressedOops = getIsCompressedOops
-
- objectSize = if (!is64bit) 8 else {
- if(!isCompressedOops) {
- 16
- } else {
- 12
- }
- }
- pointerSize = if (is64bit && !isCompressedOops) 8 else 4
- classInfos.clear()
- classInfos.put(classOf[Object], new ClassInfo(objectSize, Nil))
- }
-
- private def getIsCompressedOops : Boolean = {
- if (System.getProperty("spark.test.useCompressedOops") != null) {
- return System.getProperty("spark.test.useCompressedOops").toBoolean
- }
-
- try {
- val hotSpotMBeanName = "com.sun.management:type=HotSpotDiagnostic"
- val server = ManagementFactory.getPlatformMBeanServer()
-
- // NOTE: This should throw an exception in non-Sun JVMs
- val hotSpotMBeanClass = Class.forName("com.sun.management.HotSpotDiagnosticMXBean")
- val getVMMethod = hotSpotMBeanClass.getDeclaredMethod("getVMOption",
- Class.forName("java.lang.String"))
-
- val bean = ManagementFactory.newPlatformMXBeanProxy(server,
- hotSpotMBeanName, hotSpotMBeanClass)
- // TODO: We could use reflection on the VMOption returned ?
- return getVMMethod.invoke(bean, "UseCompressedOops").toString.contains("true")
- } catch {
- case e: Exception => {
- // Guess whether they've enabled UseCompressedOops based on whether maxMemory < 32 GB
- val guess = Runtime.getRuntime.maxMemory < (32L*1024*1024*1024)
- val guessInWords = if (guess) "yes" else "not"
- logWarning("Failed to check whether UseCompressedOops is set; assuming " + guessInWords)
- return guess
- }
- }
- }
-
- /**
- * The state of an ongoing size estimation. Contains a stack of objects to visit as well as an
- * IdentityHashMap of visited objects, and provides utility methods for enqueueing new objects
- * to visit.
- */
- private class SearchState(val visited: IdentityHashMap[AnyRef, AnyRef]) {
- val stack = new ArrayBuffer[AnyRef]
- var size = 0L
-
- def enqueue(obj: AnyRef) {
- if (obj != null && !visited.containsKey(obj)) {
- visited.put(obj, null)
- stack += obj
- }
- }
-
- def isFinished(): Boolean = stack.isEmpty
-
- def dequeue(): AnyRef = {
- val elem = stack.last
- stack.trimEnd(1)
- return elem
- }
- }
-
- /**
- * Cached information about each class. We remember two things: the "shell size" of the class
- * (size of all non-static fields plus the java.lang.Object size), and any fields that are
- * pointers to objects.
- */
- private class ClassInfo(
- val shellSize: Long,
- val pointerFields: List[Field]) {}
-
- def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef])
-
- private def estimate(obj: AnyRef, visited: IdentityHashMap[AnyRef, AnyRef]): Long = {
- val state = new SearchState(visited)
- state.enqueue(obj)
- while (!state.isFinished) {
- visitSingleObject(state.dequeue(), state)
- }
- return state.size
- }
-
- private def visitSingleObject(obj: AnyRef, state: SearchState) {
- val cls = obj.getClass
- if (cls.isArray) {
- visitArray(obj, cls, state)
- } else if (obj.isInstanceOf[ClassLoader] || obj.isInstanceOf[Class[_]]) {
- // Hadoop JobConfs created in the interpreter have a ClassLoader, which greatly confuses
- // the size estimator since it references the whole REPL. Do nothing in this case. In
- // general all ClassLoaders and Classes will be shared between objects anyway.
- } else {
- val classInfo = getClassInfo(cls)
- state.size += classInfo.shellSize
- for (field <- classInfo.pointerFields) {
- state.enqueue(field.get(obj))
- }
- }
- }
-
- // Estimat the size of arrays larger than ARRAY_SIZE_FOR_SAMPLING by sampling.
- private val ARRAY_SIZE_FOR_SAMPLING = 200
- private val ARRAY_SAMPLE_SIZE = 100 // should be lower than ARRAY_SIZE_FOR_SAMPLING
-
- private def visitArray(array: AnyRef, cls: Class[_], state: SearchState) {
- val length = JArray.getLength(array)
- val elementClass = cls.getComponentType
-
- // Arrays have object header and length field which is an integer
- var arrSize: Long = alignSize(objectSize + INT_SIZE)
-
- if (elementClass.isPrimitive) {
- arrSize += alignSize(length * primitiveSize(elementClass))
- state.size += arrSize
- } else {
- arrSize += alignSize(length * pointerSize)
- state.size += arrSize
-
- if (length <= ARRAY_SIZE_FOR_SAMPLING) {
- for (i <- 0 until length) {
- state.enqueue(JArray.get(array, i))
- }
- } else {
- // Estimate the size of a large array by sampling elements without replacement.
- var size = 0.0
- val rand = new Random(42)
- val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
- for (i <- 0 until ARRAY_SAMPLE_SIZE) {
- var index = 0
- do {
- index = rand.nextInt(length)
- } while (drawn.contains(index))
- drawn.add(index)
- val elem = JArray.get(array, index)
- size += SizeEstimator.estimate(elem, state.visited)
- }
- state.size += ((length / (ARRAY_SAMPLE_SIZE * 1.0)) * size).toLong
- }
- }
- }
-
- private def primitiveSize(cls: Class[_]): Long = {
- if (cls == classOf[Byte])
- BYTE_SIZE
- else if (cls == classOf[Boolean])
- BOOLEAN_SIZE
- else if (cls == classOf[Char])
- CHAR_SIZE
- else if (cls == classOf[Short])
- SHORT_SIZE
- else if (cls == classOf[Int])
- INT_SIZE
- else if (cls == classOf[Long])
- LONG_SIZE
- else if (cls == classOf[Float])
- FLOAT_SIZE
- else if (cls == classOf[Double])
- DOUBLE_SIZE
- else throw new IllegalArgumentException(
- "Non-primitive class " + cls + " passed to primitiveSize()")
- }
-
- /**
- * Get or compute the ClassInfo for a given class.
- */
- private def getClassInfo(cls: Class[_]): ClassInfo = {
- // Check whether we've already cached a ClassInfo for this class
- val info = classInfos.get(cls)
- if (info != null) {
- return info
- }
-
- val parent = getClassInfo(cls.getSuperclass)
- var shellSize = parent.shellSize
- var pointerFields = parent.pointerFields
-
- for (field <- cls.getDeclaredFields) {
- if (!Modifier.isStatic(field.getModifiers)) {
- val fieldClass = field.getType
- if (fieldClass.isPrimitive) {
- shellSize += primitiveSize(fieldClass)
- } else {
- field.setAccessible(true) // Enable future get()'s on this field
- shellSize += pointerSize
- pointerFields = field :: pointerFields
- }
- }
- }
-
- shellSize = alignSize(shellSize)
-
- // Create and cache a new ClassInfo
- val newInfo = new ClassInfo(shellSize, pointerFields)
- classInfos.put(cls, newInfo)
- return newInfo
- }
-
- private def alignSize(size: Long): Long = {
- val rem = size % ALIGN_SIZE
- return if (rem == 0) size else (size + ALIGN_SIZE - rem)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
deleted file mode 100644
index 7ce9505..0000000
--- a/core/src/main/scala/spark/SparkContext.scala
+++ /dev/null
@@ -1,995 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import java.io._
-import java.net.URI
-import java.util.Properties
-import java.util.concurrent.atomic.AtomicInteger
-
-import scala.collection.Map
-import scala.collection.generic.Growable
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.util.DynamicVariable
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
-import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
-import org.apache.mesos.MesosNativeLibrary
-
-import spark.deploy.LocalSparkCluster
-import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
- OrderedRDDFunctions}
-import spark.scheduler._
-import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
- ClusterScheduler, Schedulable, SchedulingMode}
-import spark.scheduler.local.LocalScheduler
-import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
-import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
-import spark.ui.SparkUI
-import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import scala.Some
-import spark.scheduler.StageInfo
-import spark.storage.RDDInfo
-import spark.storage.StorageStatus
-
-/**
- * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
- * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
- *
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
- * @param environment Environment variables to set on worker nodes.
- */
-class SparkContext(
- val master: String,
- val appName: String,
- val sparkHome: String = null,
- val jars: Seq[String] = Nil,
- val environment: Map[String, String] = Map(),
- // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
- // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
- extends Logging {
-
- // Ensure logging is initialized before we spawn any threads
- initLogging()
-
- // Set Spark driver host and port system properties
- if (System.getProperty("spark.driver.host") == null) {
- System.setProperty("spark.driver.host", Utils.localHostName())
- }
- if (System.getProperty("spark.driver.port") == null) {
- System.setProperty("spark.driver.port", "0")
- }
-
- val isLocal = (master == "local" || master.startsWith("local["))
-
- // Create the Spark execution environment (cache, map output tracker, etc)
- private[spark] val env = SparkEnv.createFromSystemProperties(
- "<driver>",
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port").toInt,
- true,
- isLocal)
- SparkEnv.set(env)
-
- // Used to store a URL for each static file/jar together with the file's local timestamp
- private[spark] val addedFiles = HashMap[String, Long]()
- private[spark] val addedJars = HashMap[String, Long]()
-
- // Keeps track of all persisted RDDs
- private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]]
- private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup)
-
- // Initalize the Spark UI
- private[spark] val ui = new SparkUI(this)
- ui.bind()
-
- val startTime = System.currentTimeMillis()
-
- // Add each JAR given through the constructor
- if (jars != null) {
- jars.foreach { addJar(_) }
- }
-
- // Environment variables to pass to our executors
- private[spark] val executorEnvs = HashMap[String, String]()
- // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner
- for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) {
- val value = System.getenv(key)
- if (value != null) {
- executorEnvs(key) = value
- }
- }
- // Since memory can be set with a system property too, use that
- executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m"
- if (environment != null) {
- executorEnvs ++= environment
- }
-
- // Create and start the scheduler
- private var taskScheduler: TaskScheduler = {
- // Regular expression used for local[N] master format
- val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
- // Regular expression for local[N, maxRetries], used in tests with failing tasks
- val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r
- // Regular expression for simulating a Spark cluster of [N, cores, memory] locally
- val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
- // Regular expression for connecting to Spark deploy clusters
- val SPARK_REGEX = """(spark://.*)""".r
- //Regular expression for connection to Mesos cluster
- val MESOS_REGEX = """(mesos://.*)""".r
-
- master match {
- case "local" =>
- new LocalScheduler(1, 0, this)
-
- case LOCAL_N_REGEX(threads) =>
- new LocalScheduler(threads.toInt, 0, this)
-
- case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
- new LocalScheduler(threads.toInt, maxFailures.toInt, this)
-
- case SPARK_REGEX(sparkUrl) =>
- val scheduler = new ClusterScheduler(this)
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- scheduler.initialize(backend)
- scheduler
-
- case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
- // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
- val memoryPerSlaveInt = memoryPerSlave.toInt
- if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) {
- throw new SparkException(
- "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
- memoryPerSlaveInt, SparkContext.executorMemoryRequested))
- }
-
- val scheduler = new ClusterScheduler(this)
- val localCluster = new LocalSparkCluster(
- numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
- val sparkUrl = localCluster.start()
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
- scheduler.initialize(backend)
- backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
- localCluster.stop()
- }
- scheduler
-
- case "yarn-standalone" =>
- val scheduler = try {
- val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler")
- val cons = clazz.getConstructor(classOf[SparkContext])
- cons.newInstance(this).asInstanceOf[ClusterScheduler]
- } catch {
- // TODO: Enumerate the exact reasons why it can fail
- // But irrespective of it, it means we cannot proceed !
- case th: Throwable => {
- throw new SparkException("YARN mode not available ?", th)
- }
- }
- val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem)
- scheduler.initialize(backend)
- scheduler
-
- case _ =>
- if (MESOS_REGEX.findFirstIn(master).isEmpty) {
- logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master))
- }
- MesosNativeLibrary.load()
- val scheduler = new ClusterScheduler(this)
- val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
- val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos://
- val backend = if (coarseGrained) {
- new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
- } else {
- new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName)
- }
- scheduler.initialize(backend)
- scheduler
- }
- }
- taskScheduler.start()
-
- @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
- dagScheduler.start()
-
- ui.start()
-
- /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
- val hadoopConfiguration = {
- val env = SparkEnv.get
- val conf = env.hadoop.newConfiguration()
- // Explicitly check for S3 environment variables
- if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- }
- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
- for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), System.getProperty(key))
- }
- val bufferSize = System.getProperty("spark.buffer.size", "65536")
- conf.set("io.file.buffer.size", bufferSize)
- conf
- }
-
- private[spark] var checkpointDir: Option[String] = None
-
- // Thread Local variable that can be used by users to pass information down the stack
- private val localProperties = new DynamicVariable[Properties](null)
-
- def initLocalProperties() {
- localProperties.value = new Properties()
- }
-
- def setLocalProperty(key: String, value: String) {
- if (localProperties.value == null) {
- localProperties.value = new Properties()
- }
- if (value == null) {
- localProperties.value.remove(key)
- } else {
- localProperties.value.setProperty(key, value)
- }
- }
-
- /** Set a human readable description of the current job. */
- def setJobDescription(value: String) {
- setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value)
- }
-
- // Post init
- taskScheduler.postStartHook()
-
- val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
- val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-
- def initDriverMetrics() {
- SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
- SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
- }
-
- initDriverMetrics()
-
- // Methods for creating RDDs
-
- /** Distribute a local Scala collection to form an RDD. */
- def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
- new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
- }
-
- /** Distribute a local Scala collection to form an RDD. */
- def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
- parallelize(seq, numSlices)
- }
-
- /** Distribute a local Scala collection to form an RDD, with one or more
- * location preferences (hostnames of Spark nodes) for each object.
- * Create a new partition for each collection item. */
- def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
- val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
- new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
- }
-
- /**
- * Read a text file from HDFS, a local file system (available on all nodes), or any
- * Hadoop-supported file system URI, and return it as an RDD of Strings.
- */
- def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
- hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
- .map(pair => pair._2.toString)
- }
-
- /**
- * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any
- * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
- * etc).
- */
- def hadoopRDD[K, V](
- conf: JobConf,
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int = defaultMinSplits
- ): RDD[(K, V)] = {
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
- }
-
- /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
- def hadoopFile[K, V](
- path: String,
- inputFormatClass: Class[_ <: InputFormat[K, V]],
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int = defaultMinSplits
- ) : RDD[(K, V)] = {
- val conf = new JobConf(hadoopConfiguration)
- FileInputFormat.setInputPaths(conf, path)
- new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
- }
-
- /**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
- * values and the InputFormat so that users don't need to pass them directly. Instead, callers
- * can just write, for example,
- * {{{
- * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits)
- * }}}
- */
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
- : RDD[(K, V)] = {
- hadoopFile(path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]],
- minSplits)
- }
-
- /**
- * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
- * values and the InputFormat so that users don't need to pass them directly. Instead, callers
- * can just write, for example,
- * {{{
- * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path)
- * }}}
- */
- def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
- hadoopFile[K, V, F](path, defaultMinSplits)
-
- /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
- (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
- newAPIHadoopFile(
- path,
- fm.erasure.asInstanceOf[Class[F]],
- km.erasure.asInstanceOf[Class[K]],
- vm.erasure.asInstanceOf[Class[V]])
- }
-
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
- */
- def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
- path: String,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V],
- conf: Configuration = hadoopConfiguration): RDD[(K, V)] = {
- val job = new NewHadoopJob(conf)
- NewFileInputFormat.addInputPath(job, new Path(path))
- val updatedConf = job.getConfiguration
- new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf)
- }
-
- /**
- * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
- * and extra configuration options to pass to the input format.
- */
- def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
- conf: Configuration = hadoopConfiguration,
- fClass: Class[F],
- kClass: Class[K],
- vClass: Class[V]): RDD[(K, V)] = {
- new NewHadoopRDD(this, fClass, kClass, vClass, conf)
- }
-
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String,
- keyClass: Class[K],
- valueClass: Class[V],
- minSplits: Int
- ): RDD[(K, V)] = {
- val inputFormatClass = classOf[SequenceFileInputFormat[K, V]]
- hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)
- }
-
- /** Get an RDD for a Hadoop SequenceFile with given key and value types. */
- def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] =
- sequenceFile(path, keyClass, valueClass, defaultMinSplits)
-
- /**
- * Version of sequenceFile() for types implicitly convertible to Writables through a
- * WritableConverter. For example, to access a SequenceFile where the keys are Text and the
- * values are IntWritable, you could simply write
- * {{{
- * sparkContext.sequenceFile[String, Int](path, ...)
- * }}}
- *
- * WritableConverters are provided in a somewhat strange way (by an implicit function) to support
- * both subclasses of Writable and types for which we define a converter (e.g. Int to
- * IntWritable). The most natural thing would've been to have implicit objects for the
- * converters, but then we couldn't have an object for every subclass of Writable (you can't
- * have a parameterized singleton object). We use functions instead to create a new converter
- * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
- * allow it to figure out the Writable class to use in the subclass case.
- */
- def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
- (implicit km: ClassManifest[K], vm: ClassManifest[V],
- kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
- : RDD[(K, V)] = {
- val kc = kcf()
- val vc = vcf()
- val format = classOf[SequenceFileInputFormat[Writable, Writable]]
- val writables = hadoopFile(path, format,
- kc.writableClass(km).asInstanceOf[Class[Writable]],
- vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits)
- writables.map{case (k,v) => (kc.convert(k), vc.convert(v))}
- }
-
- /**
- * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and
- * BytesWritable values that contain a serialized partition. This is still an experimental storage
- * format and may not be supported exactly as is in future Spark releases. It will also be pretty
- * slow if you use the default serializer (Java serialization), though the nice thing about it is
- * that there's very little effort required to save arbitrary objects.
- */
- def objectFile[T: ClassManifest](
- path: String,
- minSplits: Int = defaultMinSplits
- ): RDD[T] = {
- sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits)
- .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes))
- }
-
-
- protected[spark] def checkpointFile[T: ClassManifest](
- path: String
- ): RDD[T] = {
- new CheckpointRDD[T](this, path)
- }
-
- /** Build the union of a list of RDDs. */
- def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
-
- /** Build the union of a list of RDDs passed as variable-length arguments. */
- def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
- new UnionRDD(this, Seq(first) ++ rest)
-
- // Methods for creating shared variables
-
- /**
- * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values
- * to using the `+=` method. Only the driver can access the accumulator's `value`.
- */
- def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) =
- new Accumulator(initialValue, param)
-
- /**
- * Create an [[spark.Accumulable]] shared variable, to which tasks can add values with `+=`.
- * Only the driver can access the accumuable's `value`.
- * @tparam T accumulator type
- * @tparam R type that can be added to the accumulator
- */
- def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) =
- new Accumulable(initialValue, param)
-
- /**
- * Create an accumulator from a "mutable collection" type.
- *
- * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by
- * standard mutable collections. So you can use this with mutable Map, Set, etc.
- */
- def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = {
- val param = new GrowableAccumulableParam[R,T]
- new Accumulable(initialValue, param)
- }
-
- /**
- * Broadcast a read-only variable to the cluster, returning a [[spark.broadcast.Broadcast]] object for
- * reading it in distributed functions. The variable will be sent to each cluster only once.
- */
- def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
-
- /**
- * Add a file to be downloaded with this Spark job on every node.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs,
- * use `SparkFiles.get(path)` to find its download location.
- */
- def addFile(path: String) {
- val uri = new URI(path)
- val key = uri.getScheme match {
- case null | "file" => env.httpFileServer.addFile(new File(uri.getPath))
- case _ => path
- }
- addedFiles(key) = System.currentTimeMillis
-
- // Fetch the file locally in case a job is executed locally.
- // Jobs that run through LocalScheduler will already fetch the required dependencies,
- // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here.
- Utils.fetchFile(path, new File(SparkFiles.getRootDirectory))
-
- logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key))
- }
-
- def addSparkListener(listener: SparkListener) {
- dagScheduler.addSparkListener(listener)
- }
-
- /**
- * Return a map from the slave to the max memory available for caching and the remaining
- * memory available for caching.
- */
- def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
- env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
- (blockManagerId.host + ":" + blockManagerId.port, mem)
- }
- }
-
- /**
- * Return information about what RDDs are cached, if they are in mem or on disk, how much space
- * they take, etc.
- */
- def getRDDStorageInfo: Array[RDDInfo] = {
- StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this)
- }
-
- /**
- * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
- * Note that this does not necessarily mean the caching or computation was successful.
- */
- def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
-
- def getStageInfo: Map[Stage,StageInfo] = {
- dagScheduler.stageToInfos
- }
-
- /**
- * Return information about blocks stored in all of the slaves
- */
- def getExecutorStorageStatus: Array[StorageStatus] = {
- env.blockManager.master.getStorageStatus
- }
-
- /**
- * Return pools for fair scheduler
- * TODO(xiajunluan): We should take nested pools into account
- */
- def getAllPools: ArrayBuffer[Schedulable] = {
- taskScheduler.rootPool.schedulableQueue
- }
-
- /**
- * Return the pool associated with the given name, if one exists
- */
- def getPoolForName(pool: String): Option[Schedulable] = {
- taskScheduler.rootPool.schedulableNameToSchedulable.get(pool)
- }
-
- /**
- * Return current scheduling mode
- */
- def getSchedulingMode: SchedulingMode.SchedulingMode = {
- taskScheduler.schedulingMode
- }
-
- /**
- * Clear the job's list of files added by `addFile` so that they do not get downloaded to
- * any new nodes.
- */
- def clearFiles() {
- addedFiles.clear()
- }
-
- /**
- * Gets the locality information associated with the partition in a particular rdd
- * @param rdd of interest
- * @param partition to be looked up for locality
- * @return list of preferred locations for the partition
- */
- private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = {
- dagScheduler.getPreferredLocs(rdd, partition)
- }
-
- /**
- * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
- * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
- * filesystems), or an HTTP, HTTPS or FTP URI.
- */
- def addJar(path: String) {
- if (null == path) {
- logWarning("null specified as parameter to addJar",
- new SparkException("null specified as parameter to addJar"))
- } else {
- val env = SparkEnv.get
- val uri = new URI(path)
- val key = uri.getScheme match {
- case null | "file" =>
- if (env.hadoop.isYarnMode()) {
- logWarning("local jar specified as parameter to addJar under Yarn mode")
- return
- }
- env.httpFileServer.addJar(new File(uri.getPath))
- case _ => path
- }
- addedJars(key) = System.currentTimeMillis
- logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key))
- }
- }
-
- /**
- * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
- * any new nodes.
- */
- def clearJars() {
- addedJars.clear()
- }
-
- /** Shut down the SparkContext. */
- def stop() {
- ui.stop()
- // Do this only if not stopped already - best case effort.
- // prevent NPE if stopped more than once.
- val dagSchedulerCopy = dagScheduler
- dagScheduler = null
- if (dagSchedulerCopy != null) {
- metadataCleaner.cancel()
- dagSchedulerCopy.stop()
- taskScheduler = null
- // TODO: Cache.stop()?
- env.stop()
- // Clean up locally linked files
- clearFiles()
- clearJars()
- SparkEnv.set(null)
- ShuffleMapTask.clearCache()
- ResultTask.clearCache()
- logInfo("Successfully stopped SparkContext")
- } else {
- logInfo("SparkContext already stopped")
- }
- }
-
-
- /**
- * Get Spark's home location from either a value set through the constructor,
- * or the spark.home Java property, or the SPARK_HOME environment variable
- * (in that order of preference). If neither of these is set, return None.
- */
- private[spark] def getSparkHome(): Option[String] = {
- if (sparkHome != null) {
- Some(sparkHome)
- } else if (System.getProperty("spark.home") != null) {
- Some(System.getProperty("spark.home"))
- } else if (System.getenv("SPARK_HOME") != null) {
- Some(System.getenv("SPARK_HOME"))
- } else {
- None
- }
- }
-
- /**
- * Run a function on a given set of partitions in an RDD and pass the results to the given
- * handler function. This is the main entry point for all actions in Spark. The allowLocal
- * flag specifies whether the scheduler can run the computation on the driver rather than
- * shipping it out to the cluster, for short actions like first().
- */
- def runJob[T, U: ClassManifest](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- allowLocal: Boolean,
- resultHandler: (Int, U) => Unit) {
- val callSite = Utils.formatSparkCallSite
- logInfo("Starting job: " + callSite)
- val start = System.nanoTime
- val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value)
- logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
- rdd.doCheckpoint()
- result
- }
-
- /**
- * Run a function on a given set of partitions in an RDD and return the results as an array. The
- * allowLocal flag specifies whether the scheduler can run the computation on the driver rather
- * than shipping it out to the cluster, for short actions like first().
- */
- def runJob[T, U: ClassManifest](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- partitions: Seq[Int],
- allowLocal: Boolean
- ): Array[U] = {
- val results = new Array[U](partitions.size)
- runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res)
- results
- }
-
- /**
- * Run a job on a given set of partitions of an RDD, but take a function of type
- * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
- */
- def runJob[T, U: ClassManifest](
- rdd: RDD[T],
- func: Iterator[T] => U,
- partitions: Seq[Int],
- allowLocal: Boolean
- ): Array[U] = {
- runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal)
- }
-
- /**
- * Run a job on all partitions in an RDD and return the results in an array.
- */
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
- }
-
- /**
- * Run a job on all partitions in an RDD and return the results in an array.
- */
- def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
- runJob(rdd, func, 0 until rdd.partitions.size, false)
- }
-
- /**
- * Run a job on all partitions in an RDD and pass the results to a handler function.
- */
- def runJob[T, U: ClassManifest](
- rdd: RDD[T],
- processPartition: (TaskContext, Iterator[T]) => U,
- resultHandler: (Int, U) => Unit)
- {
- runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler)
- }
-
- /**
- * Run a job on all partitions in an RDD and pass the results to a handler function.
- */
- def runJob[T, U: ClassManifest](
- rdd: RDD[T],
- processPartition: Iterator[T] => U,
- resultHandler: (Int, U) => Unit)
- {
- val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
- runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler)
- }
-
- /**
- * Run a job that can return approximate results.
- */
- def runApproximateJob[T, U, R](
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- evaluator: ApproximateEvaluator[U, R],
- timeout: Long): PartialResult[R] = {
- val callSite = Utils.formatSparkCallSite
- logInfo("Starting job: " + callSite)
- val start = System.nanoTime
- val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value)
- logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
- result
- }
-
- /**
- * Clean a closure to make it ready to serialized and send to tasks
- * (removes unreferenced variables in $outer's, updates REPL variables)
- */
- private[spark] def clean[F <: AnyRef](f: F): F = {
- ClosureCleaner.clean(f)
- return f
- }
-
- /**
- * Set the directory under which RDDs are going to be checkpointed. The directory must
- * be a HDFS path if running on a cluster. If the directory does not exist, it will
- * be created. If the directory exists and useExisting is set to true, then the
- * exisiting directory will be used. Otherwise an exception will be thrown to
- * prevent accidental overriding of checkpoint files in the existing directory.
- */
- def setCheckpointDir(dir: String, useExisting: Boolean = false) {
- val env = SparkEnv.get
- val path = new Path(dir)
- val fs = path.getFileSystem(env.hadoop.newConfiguration())
- if (!useExisting) {
- if (fs.exists(path)) {
- throw new Exception("Checkpoint directory '" + path + "' already exists.")
- } else {
- fs.mkdirs(path)
- }
- }
- checkpointDir = Some(dir)
- }
-
- /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
- def defaultParallelism: Int = taskScheduler.defaultParallelism
-
- /** Default min number of partitions for Hadoop RDDs when not given by user */
- def defaultMinSplits: Int = math.min(defaultParallelism, 2)
-
- private val nextShuffleId = new AtomicInteger(0)
-
- private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
-
- private val nextRddId = new AtomicInteger(0)
-
- /** Register a new RDD, returning its RDD ID */
- private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
-
- /** Called by MetadataCleaner to clean up the persistentRdds map periodically */
- private[spark] def cleanup(cleanupTime: Long) {
- persistentRdds.clearOldValues(cleanupTime)
- }
-}
-
-/**
- * The SparkContext object contains a number of implicit conversions and parameters for use with
- * various Spark features.
- */
-object SparkContext {
- val SPARK_JOB_DESCRIPTION = "spark.job.description"
-
- implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
- def addInPlace(t1: Double, t2: Double): Double = t1 + t2
- def zero(initialValue: Double) = 0.0
- }
-
- implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
- def addInPlace(t1: Int, t2: Int): Int = t1 + t2
- def zero(initialValue: Int) = 0
- }
-
- implicit object LongAccumulatorParam extends AccumulatorParam[Long] {
- def addInPlace(t1: Long, t2: Long) = t1 + t2
- def zero(initialValue: Long) = 0l
- }
-
- implicit object FloatAccumulatorParam extends AccumulatorParam[Float] {
- def addInPlace(t1: Float, t2: Float) = t1 + t2
- def zero(initialValue: Float) = 0f
- }
-
- // TODO: Add AccumulatorParams for other types, e.g. lists and strings
-
- implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
- new PairRDDFunctions(rdd)
-
- implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
- rdd: RDD[(K, V)]) =
- new SequenceFileRDDFunctions(rdd)
-
- implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
- rdd: RDD[(K, V)]) =
- new OrderedRDDFunctions[K, V, (K, V)](rdd)
-
- implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
-
- implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) =
- new DoubleRDDFunctions(rdd.map(x => num.toDouble(x)))
-
- // Implicit conversions to common Writable types, for saveAsSequenceFile
-
- implicit def intToIntWritable(i: Int) = new IntWritable(i)
-
- implicit def longToLongWritable(l: Long) = new LongWritable(l)
-
- implicit def floatToFloatWritable(f: Float) = new FloatWritable(f)
-
- implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d)
-
- implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b)
-
- implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob)
-
- implicit def stringToText(s: String) = new Text(s)
-
- private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
- def anyToWritable[U <% Writable](u: U): Writable = u
-
- new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
- arr.map(x => anyToWritable(x)).toArray)
- }
-
- // Helper objects for converting common types to Writable
- private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
- val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
- new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
- }
-
- implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
-
- implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
-
- implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
-
- implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
-
- implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get)
-
- implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
-
- implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
-
- implicit def writableWritableConverter[T <: Writable]() =
- new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
-
- /**
- * Find the JAR from which a given class was loaded, to make it easy for users to pass
- * their JARs to SparkContext
- */
- def jarOfClass(cls: Class[_]): Seq[String] = {
- val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
- if (uri != null) {
- val uriStr = uri.toString
- if (uriStr.startsWith("jar:file:")) {
- // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar
- List(uriStr.substring("jar:file:".length, uriStr.indexOf('!')))
- } else {
- Nil
- }
- } else {
- Nil
- }
- }
-
- /** Find the JAR that contains the class of a particular object */
- def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
-
- /** Get the amount of memory per executor requested through system properties or SPARK_MEM */
- private[spark] val executorMemoryRequested = {
- // TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Option(System.getProperty("spark.executor.memory"))
- .orElse(Option(System.getenv("SPARK_MEM")))
- .map(Utils.memoryStringToMb)
- .getOrElse(512)
- }
-}
-
-/**
- * A class encapsulating how to convert some type T to Writable. It stores both the Writable class
- * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
- * The getter for the writable class takes a ClassManifest[T] in case this is a generic object
- * that doesn't know the type of T when it is created. This sounds strange but is necessary to
- * support converting subclasses of Writable to themselves (writableWritableConverter).
- */
-private[spark] class WritableConverter[T](
- val writableClass: ClassManifest[T] => Class[_ <: Writable],
- val convert: Writable => T)
- extends Serializable
-
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkEnv.scala b/core/src/main/scala/spark/SparkEnv.scala
deleted file mode 100644
index 1f66e9c..0000000
--- a/core/src/main/scala/spark/SparkEnv.scala
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import collection.mutable
-import serializer.Serializer
-
-import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
-import akka.remote.RemoteActorRefProvider
-
-import spark.broadcast.BroadcastManager
-import spark.metrics.MetricsSystem
-import spark.deploy.SparkHadoopUtil
-import spark.storage.BlockManager
-import spark.storage.BlockManagerMaster
-import spark.network.ConnectionManager
-import spark.serializer.{Serializer, SerializerManager}
-import spark.util.AkkaUtils
-import spark.api.python.PythonWorkerFactory
-
-
-/**
- * Holds all the runtime environment objects for a running Spark instance (either master or worker),
- * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
- * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
- * objects needs to have the right SparkEnv set. You can get the current environment with
- * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
- */
-class SparkEnv (
- val executorId: String,
- val actorSystem: ActorSystem,
- val serializerManager: SerializerManager,
- val serializer: Serializer,
- val closureSerializer: Serializer,
- val cacheManager: CacheManager,
- val mapOutputTracker: MapOutputTracker,
- val shuffleFetcher: ShuffleFetcher,
- val broadcastManager: BroadcastManager,
- val blockManager: BlockManager,
- val connectionManager: ConnectionManager,
- val httpFileServer: HttpFileServer,
- val sparkFilesDir: String,
- val metricsSystem: MetricsSystem) {
-
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
-
- val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
- if(yarnMode) {
- try {
- Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
- } catch {
- case th: Throwable => throw new SparkException("Unable to load YARN support", th)
- }
- } else {
- new SparkHadoopUtil
- }
- }
-
- def stop() {
- pythonWorkers.foreach { case(key, worker) => worker.stop() }
- httpFileServer.stop()
- mapOutputTracker.stop()
- shuffleFetcher.stop()
- broadcastManager.stop()
- blockManager.stop()
- blockManager.master.stop()
- metricsSystem.stop()
- actorSystem.shutdown()
- // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
- // down, but let's call it anyway in case it gets fixed in a later release
- actorSystem.awaitTermination()
- }
-
- def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
- }
- }
-}
-
-object SparkEnv extends Logging {
- private val env = new ThreadLocal[SparkEnv]
- @volatile private var lastSetSparkEnv : SparkEnv = _
-
- def set(e: SparkEnv) {
- lastSetSparkEnv = e
- env.set(e)
- }
-
- /**
- * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
- * previously set in any thread.
- */
- def get: SparkEnv = {
- Option(env.get()).getOrElse(lastSetSparkEnv)
- }
-
- /**
- * Returns the ThreadLocal SparkEnv.
- */
- def getThreadLocal : SparkEnv = {
- env.get()
- }
-
- def createFromSystemProperties(
- executorId: String,
- hostname: String,
- port: Int,
- isDriver: Boolean,
- isLocal: Boolean): SparkEnv = {
-
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
-
- // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
- // figure out which port number Akka actually bound to and set spark.driver.port to it.
- if (isDriver && port == 0) {
- System.setProperty("spark.driver.port", boundPort.toString)
- }
-
- // set only if unset until now.
- if (System.getProperty("spark.hostPort", null) == null) {
- if (!isDriver){
- // unexpected
- Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
- }
- Utils.checkHost(hostname)
- System.setProperty("spark.hostPort", hostname + ":" + boundPort)
- }
-
- val classLoader = Thread.currentThread.getContextClassLoader
-
- // Create an instance of the class named by the given Java system property, or by
- // defaultClassName if the property is not set, and return it as a T
- def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = System.getProperty(propertyName, defaultClassName)
- Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
- }
-
- val serializerManager = new SerializerManager
-
- val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "spark.JavaSerializer"))
-
- val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "spark.JavaSerializer"))
-
- def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
- if (isDriver) {
- logInfo("Registering " + name)
- actorSystem.actorOf(Props(newActor), name = name)
- } else {
- val driverHost: String = System.getProperty("spark.driver.host", "localhost")
- val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
- Utils.checkHost(driverHost, "Expected hostname")
- val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
- logInfo("Connecting to " + name + ": " + url)
- actorSystem.actorFor(url)
- }
- }
-
- val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
- "BlockManagerMaster",
- new spark.storage.BlockManagerMasterActor(isLocal)))
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
-
- val connectionManager = blockManager.connectionManager
-
- val broadcastManager = new BroadcastManager(isDriver)
-
- val cacheManager = new CacheManager(blockManager)
-
- // Have to assign trackerActor after initialization as MapOutputTrackerActor
- // requires the MapOutputTracker itself
- val mapOutputTracker = new MapOutputTracker()
- mapOutputTracker.trackerActor = registerOrLookup(
- "MapOutputTracker",
- new MapOutputTrackerActor(mapOutputTracker))
-
- val shuffleFetcher = instantiateClass[ShuffleFetcher](
- "spark.shuffle.fetcher", "spark.BlockStoreShuffleFetcher")
-
- val httpFileServer = new HttpFileServer()
- httpFileServer.initialize()
- System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
-
- val metricsSystem = if (isDriver) {
- MetricsSystem.createMetricsSystem("driver")
- } else {
- MetricsSystem.createMetricsSystem("executor")
- }
- metricsSystem.start()
-
- // Set the sparkFiles directory, used when downloading dependencies. In local mode,
- // this is a temporary directory; in distributed mode, this is the executor's current working
- // directory.
- val sparkFilesDir: String = if (isDriver) {
- Utils.createTempDir().getAbsolutePath
- } else {
- "."
- }
-
- // Warn about deprecated spark.cache.class property
- if (System.getProperty("spark.cache.class") != null) {
- logWarning("The spark.cache.class property is no longer being used! Specify storage " +
- "levels using the RDD.persist() method instead.")
- }
-
- new SparkEnv(
- executorId,
- actorSystem,
- serializerManager,
- serializer,
- closureSerializer,
- cacheManager,
- mapOutputTracker,
- shuffleFetcher,
- broadcastManager,
- blockManager,
- connectionManager,
- httpFileServer,
- sparkFilesDir,
- metricsSystem)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkException.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkException.scala b/core/src/main/scala/spark/SparkException.scala
deleted file mode 100644
index b7045ee..0000000
--- a/core/src/main/scala/spark/SparkException.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-class SparkException(message: String, cause: Throwable)
- extends Exception(message, cause) {
-
- def this(message: String) = this(message, null)
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkFiles.java
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkFiles.java b/core/src/main/scala/spark/SparkFiles.java
deleted file mode 100644
index f9b3f79..0000000
--- a/core/src/main/scala/spark/SparkFiles.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark;
-
-import java.io.File;
-
-/**
- * Resolves paths to files added through `SparkContext.addFile()`.
- */
-public class SparkFiles {
-
- private SparkFiles() {}
-
- /**
- * Get the absolute path of a file added through `SparkContext.addFile()`.
- */
- public static String get(String filename) {
- return new File(getRootDirectory(), filename).getAbsolutePath();
- }
-
- /**
- * Get the root directory that contains files added through `SparkContext.addFile()`.
- */
- public static String getRootDirectory() {
- return SparkEnv.get().sparkFilesDir();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/SparkHadoopWriter.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/SparkHadoopWriter.scala b/core/src/main/scala/spark/SparkHadoopWriter.scala
deleted file mode 100644
index 6b330ef..0000000
--- a/core/src/main/scala/spark/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.mapred
-
-import org.apache.hadoop.fs.FileSystem
-import org.apache.hadoop.fs.Path
-
-import java.text.SimpleDateFormat
-import java.text.NumberFormat
-import java.io.IOException
-import java.util.Date
-
-import spark.Logging
-import spark.SerializableWritable
-
-/**
- * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
- * because we need to access this class from the `spark` package to use some package-private Hadoop
- * functions, but this class should not be used directly by users.
- *
- * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
- * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
- */
-class SparkHadoopWriter(@transient jobConf: JobConf) extends Logging with SparkHadoopMapRedUtil with Serializable {
-
- private val now = new Date()
- private val conf = new SerializableWritable(jobConf)
-
- private var jobID = 0
- private var splitID = 0
- private var attemptID = 0
- private var jID: SerializableWritable[JobID] = null
- private var taID: SerializableWritable[TaskAttemptID] = null
-
- @transient private var writer: RecordWriter[AnyRef,AnyRef] = null
- @transient private var format: OutputFormat[AnyRef,AnyRef] = null
- @transient private var committer: OutputCommitter = null
- @transient private var jobContext: JobContext = null
- @transient private var taskContext: TaskAttemptContext = null
-
- def preSetup() {
- setIDs(0, 0, 0)
- setConfParams()
-
- val jCtxt = getJobContext()
- getOutputCommitter().setupJob(jCtxt)
- }
-
-
- def setup(jobid: Int, splitid: Int, attemptid: Int) {
- setIDs(jobid, splitid, attemptid)
- setConfParams()
- }
-
- def open() {
- val numfmt = NumberFormat.getInstance()
- numfmt.setMinimumIntegerDigits(5)
- numfmt.setGroupingUsed(false)
-
- val outputName = "part-" + numfmt.format(splitID)
- val path = FileOutputFormat.getOutputPath(conf.value)
- val fs: FileSystem = {
- if (path != null) {
- path.getFileSystem(conf.value)
- } else {
- FileSystem.get(conf.value)
- }
- }
-
- getOutputCommitter().setupTask(getTaskContext())
- writer = getOutputFormat().getRecordWriter(
- fs, conf.value, outputName, Reporter.NULL)
- }
-
- def write(key: AnyRef, value: AnyRef) {
- if (writer!=null) {
- //println (">>> Writing ("+key.toString+": " + key.getClass.toString + ", " + value.toString + ": " + value.getClass.toString + ")")
- writer.write(key, value)
- } else {
- throw new IOException("Writer is null, open() has not been called")
- }
- }
-
- def close() {
- writer.close(Reporter.NULL)
- }
-
- def commit() {
- val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
- if (cmtr.needsTaskCommit(taCtxt)) {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException => {
- logError("Error committing the output of task: " + taID.value, e)
- cmtr.abortTask(taCtxt)
- throw e
- }
- }
- } else {
- logWarning ("No need to commit output of task: " + taID.value)
- }
- }
-
- def commitJob() {
- // always ? Or if cmtr.needsTaskCommit ?
- val cmtr = getOutputCommitter()
- cmtr.commitJob(getJobContext())
- }
-
- def cleanup() {
- getOutputCommitter().cleanupJob(getJobContext())
- }
-
- // ********* Private Functions *********
-
- private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = {
- if (format == null) {
- format = conf.value.getOutputFormat()
- .asInstanceOf[OutputFormat[AnyRef,AnyRef]]
- }
- return format
- }
-
- private def getOutputCommitter(): OutputCommitter = {
- if (committer == null) {
- committer = conf.value.getOutputCommitter
- }
- return committer
- }
-
- private def getJobContext(): JobContext = {
- if (jobContext == null) {
- jobContext = newJobContext(conf.value, jID.value)
- }
- return jobContext
- }
-
- private def getTaskContext(): TaskAttemptContext = {
- if (taskContext == null) {
- taskContext = newTaskAttemptContext(conf.value, taID.value)
- }
- return taskContext
- }
-
- private def setIDs(jobid: Int, splitid: Int, attemptid: Int) {
- jobID = jobid
- splitID = splitid
- attemptID = attemptid
-
- jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobid))
- taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
- }
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
-}
-
-object SparkHadoopWriter {
- def createJobID(time: Date, id: Int): JobID = {
- val formatter = new SimpleDateFormat("yyyyMMddHHmm")
- val jobtrackerID = formatter.format(new Date())
- return new JobID(jobtrackerID, id)
- }
-
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- var outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath = outputPath.makeQualified(fs)
- return outputPath
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskContext.scala b/core/src/main/scala/spark/TaskContext.scala
deleted file mode 100644
index b79f4ca..0000000
--- a/core/src/main/scala/spark/TaskContext.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import executor.TaskMetrics
-import scala.collection.mutable.ArrayBuffer
-
-class TaskContext(
- val stageId: Int,
- val splitId: Int,
- val attemptId: Long,
- val taskMetrics: TaskMetrics = TaskMetrics.empty()
-) extends Serializable {
-
- @transient val onCompleteCallbacks = new ArrayBuffer[() => Unit]
-
- // Add a callback function to be executed on task completion. An example use
- // is for HadoopRDD to register a callback to close the input stream.
- def addOnCompleteCallback(f: () => Unit) {
- onCompleteCallbacks += f
- }
-
- def executeOnCompleteCallbacks() {
- onCompleteCallbacks.foreach{_()}
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
deleted file mode 100644
index 3ad665d..0000000
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import spark.executor.TaskMetrics
-import spark.storage.BlockManagerId
-
-/**
- * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry
- * tasks several times for "ephemeral" failures, and only report back failures that require some
- * old stages to be resubmitted, such as shuffle map fetch failures.
- */
-private[spark] sealed trait TaskEndReason
-
-private[spark] case object Success extends TaskEndReason
-
-private[spark]
-case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-
-private[spark] case class FetchFailed(
- bmAddress: BlockManagerId,
- shuffleId: Int,
- mapId: Int,
- reduceId: Int)
- extends TaskEndReason
-
-private[spark] case class ExceptionFailure(
- className: String,
- description: String,
- stackTrace: Array[StackTraceElement],
- metrics: Option[TaskMetrics])
- extends TaskEndReason
-
-private[spark] case class OtherFailure(message: String) extends TaskEndReason
-
-private[spark] case class TaskResultTooBigFailure() extends TaskEndReason
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/46eecd11/core/src/main/scala/spark/TaskState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/spark/TaskState.scala b/core/src/main/scala/spark/TaskState.scala
deleted file mode 100644
index bf75753..0000000
--- a/core/src/main/scala/spark/TaskState.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package spark
-
-import org.apache.mesos.Protos.{TaskState => MesosTaskState}
-
-private[spark] object TaskState
- extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED", "KILLED", "LOST") {
-
- val LAUNCHING, RUNNING, FINISHED, FAILED, KILLED, LOST = Value
-
- val FINISHED_STATES = Set(FINISHED, FAILED, KILLED, LOST)
-
- type TaskState = Value
-
- def isFinished(state: TaskState) = FINISHED_STATES.contains(state)
-
- def toMesos(state: TaskState): MesosTaskState = state match {
- case LAUNCHING => MesosTaskState.TASK_STARTING
- case RUNNING => MesosTaskState.TASK_RUNNING
- case FINISHED => MesosTaskState.TASK_FINISHED
- case FAILED => MesosTaskState.TASK_FAILED
- case KILLED => MesosTaskState.TASK_KILLED
- case LOST => MesosTaskState.TASK_LOST
- }
-
- def fromMesos(mesosState: MesosTaskState): TaskState = mesosState match {
- case MesosTaskState.TASK_STAGING => LAUNCHING
- case MesosTaskState.TASK_STARTING => LAUNCHING
- case MesosTaskState.TASK_RUNNING => RUNNING
- case MesosTaskState.TASK_FINISHED => FINISHED
- case MesosTaskState.TASK_FAILED => FAILED
- case MesosTaskState.TASK_KILLED => KILLED
- case MesosTaskState.TASK_LOST => LOST
- }
-}