You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:41 UTC
[03/33] git commit: spark-544,
introducing SparkConf and related configuration overhaul.
spark-544, introducing SparkConf and related configuration overhaul.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2573add9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2573add9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2573add9
Branch: refs/heads/master
Commit: 2573add94cf920a88f74d80d8ea94218d812704d
Parents: 0bc57c5
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Tue Dec 24 18:30:31 2013 +0530
Committer: Prashant Sharma <sc...@gmail.com>
Committed: Wed Dec 25 00:09:36 2013 +0530
----------------------------------------------------------------------
.../org/apache/spark/MapOutputTracker.scala | 7 +-
.../scala/org/apache/spark/Partitioner.scala | 4 +-
.../main/scala/org/apache/spark/SparkConf.scala | 71 ++++++++++
.../scala/org/apache/spark/SparkContext.scala | 140 ++++++++++---------
.../main/scala/org/apache/spark/SparkEnv.scala | 44 +++---
.../org/apache/spark/api/python/PythonRDD.scala | 6 +-
.../org/apache/spark/broadcast/Broadcast.scala | 6 +-
.../spark/broadcast/BroadcastFactory.scala | 4 +-
.../apache/spark/broadcast/HttpBroadcast.scala | 14 +-
.../spark/broadcast/TorrentBroadcast.scala | 9 +-
.../spark/deploy/ApplicationDescription.scala | 2 +-
.../apache/spark/deploy/SparkHadoopUtil.scala | 3 +-
.../org/apache/spark/deploy/client/Client.scala | 13 +-
.../apache/spark/deploy/client/TestClient.scala | 7 +-
.../org/apache/spark/deploy/master/Master.scala | 31 ++--
.../spark/deploy/master/MasterArguments.scala | 7 +-
.../deploy/master/SparkZooKeeperSession.scala | 7 +-
.../master/ZooKeeperLeaderElectionAgent.scala | 9 +-
.../master/ZooKeeperPersistenceEngine.scala | 8 +-
.../spark/deploy/master/ui/MasterWebUI.scala | 2 +-
.../org/apache/spark/deploy/worker/Worker.scala | 28 ++--
.../spark/deploy/worker/ui/WorkerWebUI.scala | 6 +-
.../executor/CoarseGrainedExecutorBackend.scala | 6 +-
.../org/apache/spark/executor/Executor.scala | 10 +-
.../org/apache/spark/io/CompressionCodec.scala | 14 +-
.../apache/spark/metrics/MetricsSystem.scala | 10 +-
.../spark/network/ConnectionManager.scala | 24 ++--
.../org/apache/spark/network/ReceiverTest.scala | 4 +-
.../org/apache/spark/network/SenderTest.scala | 4 +-
.../spark/network/netty/ShuffleCopier.scala | 8 +-
.../org/apache/spark/rdd/CheckpointRDD.scala | 6 +-
.../main/scala/org/apache/spark/rdd/RDD.scala | 1 +
.../org/apache/spark/scheduler/JobLogger.scala | 2 +-
.../spark/scheduler/SchedulableBuilder.scala | 6 +-
.../scheduler/cluster/ClusterScheduler.scala | 11 +-
.../cluster/ClusterTaskSetManager.scala | 19 +--
.../cluster/CoarseGrainedSchedulerBackend.scala | 12 +-
.../cluster/SimrSchedulerBackend.scala | 4 +-
.../cluster/SparkDeploySchedulerBackend.scala | 6 +-
.../scheduler/cluster/TaskResultGetter.scala | 3 +-
.../mesos/CoarseMesosSchedulerBackend.scala | 10 +-
.../cluster/mesos/MesosSchedulerBackend.scala | 4 +-
.../spark/scheduler/local/LocalScheduler.scala | 5 +-
.../spark/serializer/KryoSerializer.scala | 10 +-
.../spark/storage/BlockFetcherIterator.scala | 4 +-
.../org/apache/spark/storage/BlockManager.scala | 38 ++---
.../spark/storage/BlockManagerMaster.scala | 11 +-
.../spark/storage/BlockManagerMasterActor.scala | 10 +-
.../spark/storage/BlockObjectWriter.scala | 5 +-
.../apache/spark/storage/DiskBlockManager.scala | 2 +-
.../spark/storage/ShuffleBlockManager.scala | 7 +-
.../apache/spark/storage/ThreadingTest.scala | 6 +-
.../scala/org/apache/spark/ui/SparkUI.scala | 2 +-
.../apache/spark/ui/UIWorkloadGenerator.scala | 4 +-
.../org/apache/spark/ui/env/EnvironmentUI.scala | 2 +-
.../spark/ui/jobs/JobProgressListener.scala | 2 +-
.../scala/org/apache/spark/util/AkkaUtils.scala | 25 ++--
.../org/apache/spark/util/MetadataCleaner.scala | 12 +-
.../org/apache/spark/util/SizeEstimator.scala | 7 +-
.../scala/org/apache/spark/util/Utils.scala | 7 +-
.../apache/spark/MapOutputTrackerSuite.scala | 16 +--
.../spark/metrics/MetricsSystemSuite.scala | 8 +-
.../spark/scheduler/DAGSchedulerSuite.scala | 23 ++-
.../apache/spark/scheduler/JobLoggerSuite.scala | 2 +-
.../cluster/ClusterSchedulerSuite.scala | 2 +-
.../cluster/ClusterTaskSetManagerSuite.scala | 4 +-
.../cluster/TaskResultGetterSuite.scala | 2 +-
.../spark/storage/BlockManagerSuite.scala | 95 +++++++------
.../spark/storage/DiskBlockManagerSuite.scala | 12 +-
.../apache/spark/util/SizeEstimatorSuite.scala | 4 +-
.../examples/bagel/WikipediaPageRank.scala | 10 +-
.../bagel/WikipediaPageRankStandalone.scala | 8 +-
.../streaming/examples/ActorWordCount.scala | 3 +-
.../apache/spark/mllib/recommendation/ALS.scala | 13 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 16 +--
.../org/apache/spark/deploy/yarn/Client.scala | 4 +-
.../spark/deploy/yarn/ClientArguments.scala | 2 +-
.../spark/deploy/yarn/WorkerLauncher.scala | 4 +-
.../deploy/yarn/YarnAllocationHandler.scala | 4 +-
.../cluster/YarnClientSchedulerBackend.scala | 4 +-
project/SparkBuild.scala | 3 +-
.../org/apache/spark/repl/SparkILoop.scala | 7 +-
.../org/apache/spark/repl/SparkIMain.scala | 7 +-
.../org/apache/spark/streaming/Checkpoint.scala | 3 +-
.../org/apache/spark/streaming/Scheduler.scala | 6 +-
.../spark/streaming/StreamingContext.scala | 2 +-
.../streaming/dstream/NetworkInputDStream.scala | 6 +-
.../spark/streaming/CheckpointSuite.scala | 6 +-
.../spark/streaming/InputStreamsSuite.scala | 18 +--
.../apache/spark/streaming/TestSuiteBase.scala | 11 +-
.../spark/deploy/yarn/ApplicationMaster.scala | 16 +--
.../org/apache/spark/deploy/yarn/Client.scala | 6 +-
.../spark/deploy/yarn/ClientArguments.scala | 2 +-
.../spark/deploy/yarn/WorkerLauncher.scala | 4 +-
.../deploy/yarn/YarnAllocationHandler.scala | 2 +-
.../cluster/YarnClientSchedulerBackend.scala | 4 +-
96 files changed, 612 insertions(+), 478 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
index ccffcc3..4520edb 100644
--- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
+++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala
@@ -50,9 +50,9 @@ private[spark] class MapOutputTrackerMasterActor(tracker: MapOutputTrackerMaster
}
}
-private[spark] class MapOutputTracker extends Logging {
+private[spark] class MapOutputTracker(conf: SparkConf) extends Logging {
- private val timeout = AkkaUtils.askTimeout
+ private val timeout = AkkaUtils.askTimeout(conf)
// Set to the MapOutputTrackerActor living on the driver
var trackerActor: Either[ActorRef, ActorSelection] = _
@@ -192,7 +192,8 @@ private[spark] class MapOutputTracker extends Logging {
}
}
-private[spark] class MapOutputTrackerMaster extends MapOutputTracker {
+private[spark] class MapOutputTrackerMaster(conf: SparkConf)
+ extends MapOutputTracker(conf) {
// Cache a serialized version of the output statuses for each shuffle to send them out faster
private var cacheEpoch = epoch
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index bcec41c..04c1eed 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -32,6 +32,8 @@ abstract class Partitioner extends Serializable {
}
object Partitioner {
+
+ import SparkContext.{globalConf => conf}
/**
* Choose a partitioner to use for a cogroup-like operation between a number of RDDs.
*
@@ -52,7 +54,7 @@ object Partitioner {
for (r <- bySize if r.partitioner != None) {
return r.partitioner.get
}
- if (System.getProperty("spark.default.parallelism") != null) {
+ if (conf.getOrElse("spark.default.parallelism", null) != null) {
return new HashPartitioner(rdd.context.defaultParallelism)
} else {
return new HashPartitioner(bySize.head.partitions.size)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkConf.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
new file mode 100644
index 0000000..9a4eefa
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -0,0 +1,71 @@
+package org.apache.spark
+
+import scala.collection.JavaConversions._
+import scala.collection.concurrent.TrieMap
+
+import com.typesafe.config.ConfigFactory
+
+private[spark] class SparkConf(loadClasspathRes: Boolean = true) extends Serializable {
+ @transient lazy val config = ConfigFactory.systemProperties()
+ .withFallback(ConfigFactory.parseResources("spark.conf"))
+ // TODO this should actually be synchronized
+ private val configMap = TrieMap[String, String]()
+
+ if (loadClasspathRes && !config.entrySet().isEmpty) {
+ for (e <- config.entrySet()) {
+ configMap += ((e.getKey, e.getValue.unwrapped().toString))
+ }
+ }
+
+ def setMasterUrl(master: String) = {
+ if (master != null)
+ configMap += (("spark.master", master))
+ this
+ }
+
+ def setAppName(name: String) = {
+ if (name != null)
+ configMap += (("spark.appName", name))
+ this
+ }
+
+ def setJars(jars: Seq[String]) = {
+ if (!jars.isEmpty)
+ configMap += (("spark.jars", jars.mkString(",")))
+ this
+ }
+
+ def set(k: String, value: String) = {
+ configMap += ((k, value))
+ this
+ }
+
+ def setSparkHome(home: String) = {
+ if (home != null)
+ configMap += (("spark.home", home))
+ this
+ }
+
+ def set(map: Seq[(String, String)]) = {
+ if (map != null && !map.isEmpty)
+ configMap ++= map
+ this
+ }
+
+ def get(k: String): String = {
+ configMap(k)
+ }
+
+ def getAllConfiguration = configMap.clone.entrySet().iterator
+
+ def getOrElse(k: String, defaultValue: String): String = {
+ configMap.getOrElse(k, defaultValue)
+ }
+
+ override def clone: SparkConf = {
+ val conf = new SparkConf(false)
+ conf.set(configMap.toSeq)
+ conf
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index a0f794e..4300b07 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -22,91 +22,99 @@ import java.net.URI
import java.util.Properties
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.Map
+import scala.collection.{Map, immutable}
+import scala.collection.JavaConversions._
import scala.collection.generic.Growable
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
+
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}
+import scala.util.Try
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.ArrayWritable
-import org.apache.hadoop.io.BooleanWritable
-import org.apache.hadoop.io.BytesWritable
-import org.apache.hadoop.io.DoubleWritable
-import org.apache.hadoop.io.FloatWritable
-import org.apache.hadoop.io.IntWritable
-import org.apache.hadoop.io.LongWritable
-import org.apache.hadoop.io.NullWritable
-import org.apache.hadoop.io.Text
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred.FileInputFormat
-import org.apache.hadoop.mapred.InputFormat
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapred.SequenceFileInputFormat
-import org.apache.hadoop.mapred.TextInputFormat
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
+import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable,
+FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
+import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat,
+TextInputFormat}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
-import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
- SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
-import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
+import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend,
+SimrSchedulerBackend, SparkDeploySchedulerBackend}
+import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend,
+MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
-import org.apache.spark.scheduler.StageInfo
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
-import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
- TimeStampedHashMap, Utils}
+import org.apache.spark.util._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param appName A name for your application, to display on the cluster web UI.
- * @param sparkHome Location where Spark is installed on cluster nodes.
- * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
- * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param conf a Spark Config object describing the context configuration. Any settings in this
+ * config overrides the default configs as well as system properties.
+ *
* @param environment Environment variables to set on worker nodes.
*/
class SparkContext(
- val master: String,
- val appName: String,
- val sparkHome: String = null,
- val jars: Seq[String] = Nil,
+ val conf: SparkConf,
val environment: Map[String, String] = Map(),
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos, etc)
// too. This is typically generated from InputFormatInfo.computePreferredLocations .. host, set
// of data-local splits on host
- val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
- scala.collection.immutable.Map())
+ val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = immutable.Map())
extends Logging {
- // Ensure logging is initialized before we spawn any threads
- initLogging()
+ /**
+ * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
+ * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
+ *
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param appName A name for your application, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes.
+ */
+ def this(master: String, appName: String, sparkHome: String = null,
+ jars: Seq[String] = Nil, environment: Map[String, String] = Map(),
+ preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] =
+ immutable.Map()) =
+ this(new SparkConf(false).setAppName(appName).setMasterUrl(master)
+ .setJars(jars).set(environment.toSeq).setSparkHome(sparkHome),
+ environment, preferredNodeLocationData)
// Set Spark driver host and port system properties
- if (System.getProperty("spark.driver.host") == null) {
- System.setProperty("spark.driver.host", Utils.localHostName())
- }
- if (System.getProperty("spark.driver.port") == null) {
- System.setProperty("spark.driver.port", "0")
- }
+ Try(conf.get("spark.driver.host"))
+ .getOrElse(conf.set("spark.driver.host", Utils.localHostName()))
+
+ Try(conf.get("spark.driver.port"))
+ .getOrElse(conf.set("spark.driver.port", "0"))
+
+ val jars: Seq[String] = if (conf.getOrElse("spark.jars", null) != null) {
+ conf.get("spark.jars").split(",")
+ } else null
+
+ val master = conf.get("spark.master")
+ val appName = conf.get("spark.appName")
val isLocal = (master == "local" || master.startsWith("local["))
+ // Ensure logging is initialized before we spawn any threads
+ initLogging()
+
// Create the Spark execution environment (cache, map output tracker, etc)
private[spark] val env = SparkEnv.createFromSystemProperties(
"<driver>",
- System.getProperty("spark.driver.host"),
- System.getProperty("spark.driver.port").toInt,
+ conf.get("spark.driver.host"),
+ conf.get("spark.driver.port").toInt,
+ conf,
true,
isLocal)
SparkEnv.set(env)
@@ -165,24 +173,24 @@ class SparkContext(
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val env = SparkEnv.get
- val conf = SparkHadoopUtil.get.newConfiguration()
+ val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
if (System.getenv("AWS_ACCESS_KEY_ID") != null &&
System.getenv("AWS_SECRET_ACCESS_KEY") != null) {
- conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
- conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
- conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID"))
+ hadoopConf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
+ hadoopConf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY"))
}
// Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
Utils.getSystemProperties.foreach { case (key, value) =>
if (key.startsWith("spark.hadoop.")) {
- conf.set(key.substring("spark.hadoop.".length), value)
+ hadoopConf.set(key.substring("spark.hadoop.".length), value)
}
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536")
- conf.set("io.file.buffer.size", bufferSize)
- conf
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536")
+ hadoopConf.set("io.file.buffer.size", bufferSize)
+ hadoopConf
}
private[spark] var checkpointDir: Option[String] = None
@@ -695,10 +703,8 @@ class SparkContext(
* (in that order of preference). If neither of these is set, return None.
*/
private[spark] def getSparkHome(): Option[String] = {
- if (sparkHome != null) {
- Some(sparkHome)
- } else if (System.getProperty("spark.home") != null) {
- Some(System.getProperty("spark.home"))
+ if (conf.getOrElse("spark.home", null) != null) {
+ Some(conf.get("spark.home"))
} else if (System.getenv("SPARK_HOME") != null) {
Some(System.getenv("SPARK_HOME"))
} else {
@@ -909,6 +915,14 @@ object SparkContext {
private[spark] val SPARK_UNKNOWN_USER = "<unknown>"
+ private lazy val conf = new SparkConf()
+
+ private[spark] def globalConf = {
+ if (SparkEnv.get != null) {
+ SparkEnv.get.conf
+ } else conf
+ }
+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
@@ -1020,7 +1034,7 @@ object SparkContext {
/** Get the amount of memory per executor requested through system properties or SPARK_MEM */
private[spark] val executorMemoryRequested = {
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
- Option(System.getProperty("spark.executor.memory"))
+ Try(globalConf.get("spark.executor.memory")).toOption
.orElse(Option(System.getenv("SPARK_MEM")))
.map(Utils.memoryStringToMb)
.getOrElse(512)
@@ -1123,7 +1137,7 @@ object SparkContext {
case mesosUrl @ MESOS_REGEX(_) =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(sc)
- val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean
+ val coarseGrained = globalConf.getOrElse("spark.mesos.coarse", "false").toBoolean
val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs
val backend = if (coarseGrained) {
new CoarseMesosSchedulerBackend(scheduler, sc, url, appName)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 826f5c2..78e4ae2 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -54,7 +54,8 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
- val metricsSystem: MetricsSystem) {
+ val metricsSystem: MetricsSystem,
+ val conf: SparkConf) {
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -114,25 +115,27 @@ object SparkEnv extends Logging {
executorId: String,
hostname: String,
port: Int,
+ conf: SparkConf,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,
+ conf = conf)
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
- System.setProperty("spark.driver.port", boundPort.toString)
+ conf.set("spark.driver.port", boundPort.toString)
}
// set only if unset until now.
- if (System.getProperty("spark.hostPort", null) == null) {
+ if (conf.getOrElse("spark.hostPort", null) == null) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
}
Utils.checkHost(hostname)
- System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+ conf.set("spark.hostPort", hostname + ":" + boundPort)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -140,25 +143,25 @@ object SparkEnv extends Logging {
// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = System.getProperty(propertyName, defaultClassName)
+ val name = conf.getOrElse(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.getOrElse("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = System.getProperty("spark.driver.host", "localhost")
- val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.getOrElse("spark.driver.host", "localhost")
+ val driverPort: Int = conf.getOrElse("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
@@ -168,21 +171,21 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
- new BlockManagerMasterActor(isLocal)))
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+ new BlockManagerMasterActor(isLocal, conf)), conf)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isDriver)
+ val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster()
+ new MapOutputTrackerMaster(conf)
} else {
- new MapOutputTracker()
+ new MapOutputTracker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
@@ -193,12 +196,12 @@ object SparkEnv extends Logging {
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
- System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ conf.set("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = if (isDriver) {
- MetricsSystem.createMetricsSystem("driver")
+ MetricsSystem.createMetricsSystem("driver", conf)
} else {
- MetricsSystem.createMetricsSystem("executor")
+ MetricsSystem.createMetricsSystem("executor", conf)
}
metricsSystem.start()
@@ -212,7 +215,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (System.getProperty("spark.cache.class") != null) {
+ if (conf.getOrElse("spark.cache.class", null) != null) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
@@ -231,6 +234,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- metricsSystem)
+ metricsSystem,
+ conf)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index ca42c76..d6eacfe 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag](
accumulator: Accumulator[JList[Array[Byte]]])
extends RDD[Array[Byte]](parent) {
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
override def getPartitions = parent.partitions
@@ -247,10 +247,10 @@ private class BytesToString extends org.apache.spark.api.java.function.Function[
*/
private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
-
+ import SparkContext.{globalConf => conf}
Utils.checkHost(serverHost, "Expected hostname")
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
index 43c1829..be99d22 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/Broadcast.scala
@@ -32,7 +32,7 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
}
private[spark]
-class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable {
+class BroadcastManager(val _isDriver: Boolean, conf: SparkConf) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
@@ -43,14 +43,14 @@ class BroadcastManager(val _isDriver: Boolean) extends Logging with Serializable
private def initialize() {
synchronized {
if (!initialized) {
- val broadcastFactoryClass = System.getProperty(
+ val broadcastFactoryClass = conf.getOrElse(
"spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
- broadcastFactory.initialize(isDriver)
+ broadcastFactory.initialize(isDriver, conf)
initialized = true
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
index 68bff75..fb161ce 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/BroadcastFactory.scala
@@ -17,6 +17,8 @@
package org.apache.spark.broadcast
+import org.apache.spark.SparkConf
+
/**
* An interface for all the broadcast implementations in Spark (to allow
* multiple broadcast implementations). SparkContext uses a user-specified
@@ -24,7 +26,7 @@ package org.apache.spark.broadcast
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
- def initialize(isDriver: Boolean): Unit
+ def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
index 47db720..cecb8c2 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
-import org.apache.spark.{HttpServer, Logging, SparkEnv}
+import org.apache.spark.{SparkConf, HttpServer, Logging, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashSet, Utils}
@@ -64,7 +64,7 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
private[spark] class HttpBroadcastFactory extends BroadcastFactory {
- def initialize(isDriver: Boolean) { HttpBroadcast.initialize(isDriver) }
+ def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
@@ -88,15 +88,16 @@ private object HttpBroadcast extends Logging {
private lazy val compressionCodec = CompressionCodec.createCodec()
- def initialize(isDriver: Boolean) {
+ def initialize(isDriver: Boolean, conf: SparkConf) {
synchronized {
if (!initialized) {
- bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
- compress = System.getProperty("spark.broadcast.compress", "true").toBoolean
+ bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
+ compress = conf.getOrElse("spark.broadcast.compress", "true").toBoolean
if (isDriver) {
createServer()
+ conf.set("spark.httpBroadcast.uri", serverUri)
}
- serverUri = System.getProperty("spark.httpBroadcast.uri")
+ serverUri = conf.get("spark.httpBroadcast.uri")
initialized = true
}
}
@@ -118,7 +119,6 @@ private object HttpBroadcast extends Logging {
server = new HttpServer(broadcastDir)
server.start()
serverUri = server.uri
- System.setProperty("spark.httpBroadcast.uri", serverUri)
logInfo("Broadcast server started at " + serverUri)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
index 073a0a5..4a3801d 100644
--- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
+++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala
@@ -166,8 +166,9 @@ private object TorrentBroadcast
extends Logging {
private var initialized = false
-
- def initialize(_isDriver: Boolean) {
+ private var conf: SparkConf = null
+ def initialize(_isDriver: Boolean, conf: SparkConf) {
+ TorrentBroadcast.conf = conf //TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
@@ -179,7 +180,7 @@ extends Logging {
initialized = false
}
- val BLOCK_SIZE = System.getProperty("spark.broadcast.blockSize", "4096").toInt * 1024
+ lazy val BLOCK_SIZE = conf.getOrElse("spark.broadcast.blockSize", "4096").toInt * 1024
def blockifyObject[T](obj: T): TorrentInfo = {
val byteArray = Utils.serialize[T](obj)
@@ -238,7 +239,7 @@ private[spark] case class TorrentInfo(
private[spark] class TorrentBroadcastFactory
extends BroadcastFactory {
- def initialize(isDriver: Boolean) { TorrentBroadcast.initialize(isDriver) }
+ def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }
def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index 19d393a..dda43dc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -26,7 +26,7 @@ private[spark] class ApplicationDescription(
val appUiUrl: String)
extends Serializable {
- val user = System.getProperty("user.name", "<unknown>")
+ val user = System.getProperty("user.name", "<unknown>")
override def toString: String = "ApplicationDescription(" + name + ")"
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
index fc1537f..1c979ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
@@ -67,8 +67,9 @@ class SparkHadoopUtil {
}
object SparkHadoopUtil {
+ import SparkContext.{globalConf => conf}
private val hadoop = {
- val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ val yarnMode = java.lang.Boolean.valueOf(conf.getOrElse("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 953755e..9bbd635 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -19,20 +19,18 @@ package org.apache.spark.deploy.client
import java.util.concurrent.TimeoutException
-import scala.concurrent.duration._
import scala.concurrent.Await
+import scala.concurrent.duration._
import akka.actor._
import akka.pattern.ask
-import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.util.AkkaUtils
-
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
@@ -43,7 +41,8 @@ private[spark] class Client(
actorSystem: ActorSystem,
masterUrls: Array[String],
appDescription: ApplicationDescription,
- listener: ClientListener)
+ listener: ClientListener,
+ conf: SparkConf)
extends Logging {
val REGISTRATION_TIMEOUT = 20.seconds
@@ -178,7 +177,7 @@ private[spark] class Client(
def stop() {
if (actor != null) {
try {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(conf)
val future = actor.ask(StopClient)(timeout)
Await.result(future, timeout)
} catch {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index 5b62d3b..426cf52 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -18,7 +18,7 @@
package org.apache.spark.deploy.client
import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.{Logging}
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.deploy.{Command, ApplicationDescription}
private[spark] object TestClient {
@@ -45,11 +45,12 @@ private[spark] object TestClient {
def main(args: Array[String]) {
val url = args(0)
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
+ conf = SparkContext.globalConf)
val desc = new ApplicationDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, Array(url), desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener, SparkContext.globalConf)
client.start()
actorSystem.awaitTermination()
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index eebd079..2c162c4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -29,7 +29,7 @@ import akka.pattern.ask
import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
import akka.serialization.SerializationExtension
-import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.{SparkContext, Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.MasterMessages._
@@ -39,13 +39,13 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
-
+ val conf = SparkContext.globalConf
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
- val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
- val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
- val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
- val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
- val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+ val WORKER_TIMEOUT = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000
+ val RETAINED_APPLICATIONS = conf.getOrElse("spark.deploy.retainedApplications", "200").toInt
+ val REAPER_ITERATIONS = conf.getOrElse("spark.dead.worker.persistence", "15").toInt
+ val RECOVERY_DIR = conf.getOrElse("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = conf.getOrElse("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -63,8 +63,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname")
- val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
- val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
+ val masterMetricsSystem = MetricsSystem.createMetricsSystem("master", conf)
+ val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications", conf)
val masterSource = new MasterSource(this)
val webUi = new MasterWebUI(this, webUiPort)
@@ -86,7 +86,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
- val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
+ val spreadOutApps = conf.getOrElse("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
logInfo("Starting Spark master at " + masterUrl)
@@ -103,7 +103,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
persistenceEngine = RECOVERY_MODE match {
case "ZOOKEEPER" =>
logInfo("Persisting recovery state to ZooKeeper")
- new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system), conf)
case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
@@ -113,7 +113,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
leaderElectionAgent = RECOVERY_MODE match {
case "ZOOKEEPER" =>
- context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl))
+ context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl, conf))
case _ =>
context.actorOf(Props(classOf[MonarchyLeaderAgent], self))
}
@@ -507,7 +507,7 @@ private[spark] object Master {
val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
- val args = new MasterArguments(argStrings)
+ val args = new MasterArguments(argStrings, SparkContext.globalConf)
val (actorSystem, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort)
actorSystem.awaitTermination()
}
@@ -523,9 +523,10 @@ private[spark] object Master {
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = SparkContext.globalConf)
val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), actorName)
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(SparkContext.globalConf)
val respFuture = actor.ask(RequestWebUIPort)(timeout)
val resp = Await.result(respFuture, timeout).asInstanceOf[WebUIPortResponse]
(actorSystem, boundPort, resp.webUIBoundPort)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
index 9d89b45..7ce83f9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterArguments.scala
@@ -18,11 +18,12 @@
package org.apache.spark.deploy.master
import org.apache.spark.util.{Utils, IntParam}
+import org.apache.spark.SparkConf
/**
* Command-line parser for the master.
*/
-private[spark] class MasterArguments(args: Array[String]) {
+private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
@@ -37,8 +38,8 @@ private[spark] class MasterArguments(args: Array[String]) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}
- if (System.getProperty("master.ui.port") != null) {
- webUiPort = System.getProperty("master.ui.port").toInt
+ if (conf.get("master.ui.port") != null) {
+ webUiPort = conf.get("master.ui.port").toInt
}
parse(args.toList)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index 6cc7fd2..79d95b1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -23,7 +23,7 @@ import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.KeeperState
import org.apache.zookeeper.data.Stat
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
/**
* Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
@@ -35,8 +35,9 @@ import org.apache.spark.Logging
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g., "node already exists").
*/
-private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
- val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher,
+ conf: SparkConf) extends Logging {
+ val ZK_URL = conf.getOrElse("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
val ZK_TIMEOUT_MILLIS = 30000
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 7d535b0..df5bb36 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -21,16 +21,17 @@ import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.master.MasterMessages._
-private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef,
+ masterUrl: String, conf: SparkConf)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
private val watcher = new ZooKeeperWatcher()
- private val zk = new SparkZooKeeperSession(this)
+ private val zk = new SparkZooKeeperSession(this, conf)
private var status = LeadershipStatus.NOT_LEADER
private var myLeaderFile: String = _
private var leaderUrl: String = _
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
index 825344b..c55b720 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -17,19 +17,19 @@
package org.apache.spark.deploy.master
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.zookeeper._
import akka.serialization.Serialization
-class ZooKeeperPersistenceEngine(serialization: Serialization)
+class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
extends PersistenceEngine
with SparkZooKeeperWatcher
with Logging
{
- val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+ val WORKING_DIR = conf.getOrElse("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
- val zk = new SparkZooKeeperSession(this)
+ val zk = new SparkZooKeeperSession(this, conf)
zk.connect()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
index 9ab594b..ead3566 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(master.conf)
val host = Utils.localHostName()
val port = requestedPort
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 87531b6..75a6e75 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -25,23 +25,14 @@ import scala.collection.mutable.HashMap
import scala.concurrent.duration._
import akka.actor._
-import akka.remote.{ DisassociatedEvent, RemotingLifecycleEvent}
-
-import org.apache.spark.{SparkException, Logging}
+import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent}
+import org.apache.spark.{Logging, SparkConf, SparkException}
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import org.apache.spark.deploy.DeployMessages.WorkerStateResponse
-import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed
-import org.apache.spark.deploy.DeployMessages.KillExecutor
-import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
-import org.apache.spark.deploy.DeployMessages.Heartbeat
-import org.apache.spark.deploy.DeployMessages.RegisteredWorker
-import org.apache.spark.deploy.DeployMessages.LaunchExecutor
-import org.apache.spark.deploy.DeployMessages.RegisterWorker
+import org.apache.spark.util.{AkkaUtils, Utils}
/**
* @param masterUrls Each url should look like spark://host:port.
@@ -53,7 +44,8 @@ private[spark] class Worker(
cores: Int,
memory: Int,
masterUrls: Array[String],
- workDirPath: String = null)
+ workDirPath: String = null,
+ val conf: SparkConf)
extends Actor with Logging {
import context.dispatcher
@@ -63,7 +55,7 @@ private[spark] class Worker(
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
- val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val HEARTBEAT_MILLIS = conf.getOrElse("spark.worker.timeout", "60").toLong * 1000 / 4
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
@@ -92,7 +84,7 @@ private[spark] class Worker(
var coresUsed = 0
var memoryUsed = 0
- val metricsSystem = MetricsSystem.createMetricsSystem("worker")
+ val metricsSystem = MetricsSystem.createMetricsSystem("worker", conf)
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed
@@ -275,6 +267,7 @@ private[spark] class Worker(
}
private[spark] object Worker {
+ import org.apache.spark.SparkContext.globalConf
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
@@ -287,9 +280,10 @@ private[spark] object Worker {
: (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
+ conf = globalConf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir), name = "Worker")
+ masterUrls, workDir, globalConf), name = "Worker")
(actorSystem, boundPort)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 40d6bdb..ec47ba1 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -22,7 +22,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{JettyUtils, UIUtils}
import org.apache.spark.ui.JettyUtils._
@@ -34,10 +34,10 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
- val timeout = AkkaUtils.askTimeout
+ val timeout = AkkaUtils.askTimeout(worker.conf)
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
- System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
+ worker.conf.getOrElse("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index debbdd4..c8319f6 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import akka.actor._
import akka.remote._
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{Utils, AkkaUtils}
@@ -98,10 +98,10 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
- indestructible = true)
+ indestructible = true, conf = SparkContext.globalConf)
// set it
val sparkHostPort = hostname + ":" + boundPort
- System.setProperty("spark.hostPort", sparkHostPort)
+// conf.set("spark.hostPort", sparkHostPort)
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores),
name = "Executor")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 0f19d7a..70fc30e 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -57,17 +57,17 @@ private[spark] class Executor(
// Make sure the local hostname we report matches the cluster scheduler's name for this host
Utils.setCustomHostname(slaveHostname)
-
+ val conf = new SparkConf(false)
// Set spark.* system properties from executor arg
for ((key, value) <- properties) {
- System.setProperty(key, value)
+ conf.set(key, value)
}
// If we are in yarn mode, systems can have different disk layouts so we must set it
// to what Yarn on this system said was available. This will be used later when SparkEnv
// created.
if (java.lang.Boolean.valueOf(System.getenv("SPARK_YARN_MODE"))) {
- System.setProperty("spark.local.dir", getYarnLocalDirs())
+ conf.set("spark.local.dir", getYarnLocalDirs())
}
// Create our ClassLoader and set it on this thread
@@ -108,7 +108,7 @@ private[spark] class Executor(
// Initialize Spark environment (using system properties read above)
private val env = {
if (!isLocal) {
- val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0,
+ val _env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, conf,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
@@ -303,7 +303,7 @@ private[spark] class Executor(
* new classes defined by the REPL as the user types code
*/
private def addReplClassLoaderIfNeeded(parent: ClassLoader): ClassLoader = {
- val classUri = System.getProperty("spark.repl.class.uri")
+ val classUri = conf.getOrElse("spark.repl.class.uri", null)
if (classUri != null) {
logInfo("Using REPL class URI: " + classUri)
try {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
index 570a979..8ef5019 100644
--- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
+++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala
@@ -22,6 +22,7 @@ import java.io.{InputStream, OutputStream}
import com.ning.compress.lzf.{LZFInputStream, LZFOutputStream}
import org.xerial.snappy.{SnappyInputStream, SnappyOutputStream}
+import org.apache.spark.SparkConf
/**
@@ -37,15 +38,16 @@ trait CompressionCodec {
private[spark] object CompressionCodec {
-
+ import org.apache.spark.SparkContext.globalConf
def createCodec(): CompressionCodec = {
createCodec(System.getProperty(
"spark.io.compression.codec", classOf[LZFCompressionCodec].getName))
}
def createCodec(codecName: String): CompressionCodec = {
- Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
- .newInstance().asInstanceOf[CompressionCodec]
+ val ctor = Class.forName(codecName, true, Thread.currentThread.getContextClassLoader)
+ .getConstructor(classOf[SparkConf])
+ ctor.newInstance(globalConf).asInstanceOf[CompressionCodec]
}
}
@@ -53,7 +55,7 @@ private[spark] object CompressionCodec {
/**
* LZF implementation of [[org.apache.spark.io.CompressionCodec]].
*/
-class LZFCompressionCodec extends CompressionCodec {
+class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
new LZFOutputStream(s).setFinishBlockOnFlush(true)
@@ -67,10 +69,10 @@ class LZFCompressionCodec extends CompressionCodec {
* Snappy implementation of [[org.apache.spark.io.CompressionCodec]].
* Block size can be configured by spark.io.compression.snappy.block.size.
*/
-class SnappyCompressionCodec extends CompressionCodec {
+class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec {
override def compressedOutputStream(s: OutputStream): OutputStream = {
- val blockSize = System.getProperty("spark.io.compression.snappy.block.size", "32768").toInt
+ val blockSize = conf.getOrElse("spark.io.compression.snappy.block.size", "32768").toInt
new SnappyOutputStream(s, blockSize)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
index bec0c83..ac29816 100644
--- a/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/MetricsSystem.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import scala.collection.mutable
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.metrics.sink.{MetricsServlet, Sink}
import org.apache.spark.metrics.source.Source
@@ -62,10 +62,11 @@ import org.apache.spark.metrics.source.Source
*
* [options] is the specific property of this source or sink.
*/
-private[spark] class MetricsSystem private (val instance: String) extends Logging {
+private[spark] class MetricsSystem private (val instance: String,
+ conf: SparkConf) extends Logging {
initLogging()
- val confFile = System.getProperty("spark.metrics.conf")
+ val confFile = conf.getOrElse("spark.metrics.conf", null)
val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
@@ -159,5 +160,6 @@ private[spark] object MetricsSystem {
}
}
- def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
+ def createMetricsSystem(instance: String, conf: SparkConf): MetricsSystem =
+ new MetricsSystem(instance, conf)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
index 703bc6a..3e902f8 100644
--- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala
@@ -37,7 +37,7 @@ import scala.concurrent.duration._
import org.apache.spark.util.Utils
-private[spark] class ConnectionManager(port: Int) extends Logging {
+private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Logging {
class MessageStatus(
val message: Message,
@@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private val selector = SelectorProvider.provider.openSelector()
private val handleMessageExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.handler.threads.min","20").toInt,
- System.getProperty("spark.core.connection.handler.threads.max","60").toInt,
- System.getProperty("spark.core.connection.handler.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.handler.threads.min", "20").toInt,
+ conf.getOrElse("spark.core.connection.handler.threads.max", "60").toInt,
+ conf.getOrElse("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val handleReadWriteExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.io.threads.min","4").toInt,
- System.getProperty("spark.core.connection.io.threads.max","32").toInt,
- System.getProperty("spark.core.connection.io.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.io.threads.min", "4").toInt,
+ conf.getOrElse("spark.core.connection.io.threads.max", "32").toInt,
+ conf.getOrElse("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
// Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap
private val handleConnectExecutor = new ThreadPoolExecutor(
- System.getProperty("spark.core.connection.connect.threads.min","1").toInt,
- System.getProperty("spark.core.connection.connect.threads.max","8").toInt,
- System.getProperty("spark.core.connection.connect.threads.keepalive","60").toInt, TimeUnit.SECONDS,
+ conf.getOrElse("spark.core.connection.connect.threads.min", "1").toInt,
+ conf.getOrElse("spark.core.connection.connect.threads.max", "8").toInt,
+ conf.getOrElse("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable]())
private val serverChannel = ServerSocketChannel.open()
@@ -593,8 +593,10 @@ private[spark] class ConnectionManager(port: Int) extends Logging {
private[spark] object ConnectionManager {
+ import SparkContext.globalConf
+
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, globalConf)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
println("Received [" + msg + "] from [" + id + "]")
None
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
index 7817151..4ca3cd3 100644
--- a/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/ReceiverTest.scala
@@ -21,9 +21,9 @@ import java.nio.ByteBuffer
import java.net.InetAddress
private[spark] object ReceiverTest {
-
+ import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
- val manager = new ConnectionManager(9999)
+ val manager = new ConnectionManager(9999, globalConf)
println("Started connection manager with id = " + manager.id)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/SenderTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/SenderTest.scala b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
index 7775749..11c21fc 100644
--- a/core/src/main/scala/org/apache/spark/network/SenderTest.scala
+++ b/core/src/main/scala/org/apache/spark/network/SenderTest.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.net.InetAddress
private[spark] object SenderTest {
-
+ import org.apache.spark.SparkContext.globalConf
def main(args: Array[String]) {
if (args.length < 2) {
@@ -33,7 +33,7 @@ private[spark] object SenderTest {
val targetPort = args(1).toInt
val targetConnectionManagerId = new ConnectionManagerId(targetHost, targetPort)
- val manager = new ConnectionManager(0)
+ val manager = new ConnectionManager(0, globalConf)
println("Started connection manager with id = " + manager.id)
manager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
index b1e1576..81b3104 100644
--- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
+++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala
@@ -23,20 +23,20 @@ import io.netty.buffer.ByteBuf
import io.netty.channel.ChannelHandlerContext
import io.netty.util.CharsetUtil
-import org.apache.spark.Logging
+import org.apache.spark.{SparkContext, SparkConf, Logging}
import org.apache.spark.network.ConnectionManagerId
import scala.collection.JavaConverters._
import org.apache.spark.storage.BlockId
-private[spark] class ShuffleCopier extends Logging {
+private[spark] class ShuffleCopier(conf: SparkConf) extends Logging {
def getBlock(host: String, port: Int, blockId: BlockId,
resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) {
val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback)
- val connectTimeout = System.getProperty("spark.shuffle.netty.connect.timeout", "60000").toInt
+ val connectTimeout = conf.getOrElse("spark.shuffle.netty.connect.timeout", "60000").toInt
val fc = new FileClient(handler, connectTimeout)
try {
@@ -107,7 +107,7 @@ private[spark] object ShuffleCopier extends Logging {
val tasks = (for (i <- Range(0, threads)) yield {
Executors.callable(new Runnable() {
def run() {
- val copier = new ShuffleCopier()
+ val copier = new ShuffleCopier(SparkContext.globalConf)
copier.getBlock(host, port, blockId, echoResultCollectCallBack)
}
})
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
index a712ef1..9fbe002 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala
@@ -75,6 +75,8 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
private[spark] object CheckpointRDD extends Logging {
+ import SparkContext.{globalConf => conf}
+
def splitIdToFile(splitId: Int): String = {
"part-%05d".format(splitId)
}
@@ -92,7 +94,7 @@ private[spark] object CheckpointRDD extends Logging {
throw new IOException("Checkpoint failed: temporary path " +
tempOutputPath + " already exists")
}
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
val fileOutputStream = if (blockSize < 0) {
fs.create(tempOutputPath, false, bufferSize)
@@ -122,7 +124,7 @@ private[spark] object CheckpointRDD extends Logging {
def readFromFile[T](path: Path, context: TaskContext): Iterator[T] = {
val env = SparkEnv.get
val fs = path.getFileSystem(SparkHadoopUtil.get.newConfiguration())
- val bufferSize = System.getProperty("spark.buffer.size", "65536").toInt
+ val bufferSize = conf.getOrElse("spark.buffer.size", "65536").toInt
val fileInputStream = fs.open(path, bufferSize)
val serializer = env.serializer.newInstance()
val deserializeStream = serializer.deserializeStream(fileInputStream)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index ea45566..f8b1a69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -81,6 +81,7 @@ abstract class RDD[T: ClassTag](
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
+ private[spark] def conf = sc.conf
// =======================================================================
// Methods that should be implemented by subclasses of RDD
// =======================================================================
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
index 6092783..3f55cd5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/JobLogger.scala
@@ -41,7 +41,7 @@ import org.apache.spark.storage.StorageLevel
class JobLogger(val user: String, val logDirName: String)
extends SparkListener with Logging {
- def this() = this(System.getProperty("user.name", "<unknown>"),
+ def this() = this(System.getProperty("user.name", "<unknown>"),
String.valueOf(System.currentTimeMillis()))
private val logDir =
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 356fe56..9002d33 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,7 +20,7 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import scala.xml.XML
@@ -49,10 +49,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = Option(System.getProperty("spark.scheduler.allocation.file"))
+ val schedulerAllocFile = Option(conf.get("spark.scheduler.allocation.file"))
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
val DEFAULT_POOL_NAME = "default"
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
index 66ab8ea..7e231ec 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterScheduler.scala
@@ -49,11 +49,12 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
extends TaskScheduler
with Logging
{
+ val conf = sc.conf
// How often to check for speculative tasks
- val SPECULATION_INTERVAL = System.getProperty("spark.speculation.interval", "100").toLong
+ val SPECULATION_INTERVAL = conf.getOrElse("spark.speculation.interval", "100").toLong
// Threshold above which we warn user initial TaskSet may be starved
- val STARVATION_TIMEOUT = System.getProperty("spark.starvation.timeout", "15000").toLong
+ val STARVATION_TIMEOUT = conf.getOrElse("spark.starvation.timeout", "15000").toLong
// ClusterTaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
@@ -90,7 +91,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
var rootPool: Pool = null
// default scheduler is FIFO
val schedulingMode: SchedulingMode = SchedulingMode.withName(
- System.getProperty("spark.scheduler.mode", "FIFO"))
+ conf.getOrElse("spark.scheduler.mode", "FIFO"))
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
@@ -108,7 +109,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool)
+ new FairSchedulableBuilder(rootPool, conf)
}
}
schedulableBuilder.buildPools()
@@ -119,7 +120,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
override def start() {
backend.start()
- if (System.getProperty("spark.speculation", "false").toBoolean) {
+ if (conf.getOrElse("spark.speculation", "false").toBoolean) {
logInfo("Starting speculative execution thread")
import sc.env.actorSystem.dispatcher
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2573add9/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
index bf494aa..398b0ce 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -50,15 +50,16 @@ private[spark] class ClusterTaskSetManager(
extends TaskSetManager
with Logging
{
+ val conf = sched.sc.conf
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
- val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
+ val MAX_TASK_FAILURES = conf.getOrElse("spark.task.maxFailures", "4").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -117,7 +118,7 @@ private[spark] class ClusterTaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -677,14 +678,14 @@ private[spark] class ClusterTaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}