You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/02 06:29:56 UTC
[18/33] git commit: Fix a few settings that were being read as system
properties after merge
Fix a few settings that were being read as system properties after merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0bd1900c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0bd1900c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0bd1900c
Branch: refs/heads/master
Commit: 0bd1900cbce5946999c38293852d8ccd4f838930
Parents: b4ceed4
Author: Matei Zaharia <ma...@databricks.com>
Authored: Sun Dec 29 15:38:46 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Sun Dec 29 15:38:46 2013 -0500
----------------------------------------------------------------------
.../spark/scheduler/TaskSchedulerImpl.scala | 4 +++-
.../apache/spark/scheduler/TaskSetManager.scala | 18 ++++++++++--------
.../spark/streaming/scheduler/JobScheduler.scala | 4 ++--
3 files changed, 15 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 56a038d..bffd990 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -47,10 +47,12 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
- val maxTaskFailures: Int = System.getProperty("spark.task.maxFailures", "4").toInt,
+ val maxTaskFailures: Int,
isLocal: Boolean = false)
extends TaskScheduler with Logging
{
+ def this(sc: SparkContext) = this(sc, sc.conf.getOrElse("spark.task.maxFailures", "4").toInt)
+
val conf = sc.conf
// How often to check for speculative tasks
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 9b95e41..d752e6f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -54,12 +54,14 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
+ val conf = sched.sc.conf
+
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
+ val CPUS_PER_TASK = conf.getOrElse("spark.task.cpus", "1").toInt
// Quantile of tasks at which to start speculation
- val SPECULATION_QUANTILE = System.getProperty("spark.speculation.quantile", "0.75").toDouble
- val SPECULATION_MULTIPLIER = System.getProperty("spark.speculation.multiplier", "1.5").toDouble
+ val SPECULATION_QUANTILE = conf.getOrElse("spark.speculation.quantile", "0.75").toDouble
+ val SPECULATION_MULTIPLIER = conf.getOrElse("spark.speculation.multiplier", "1.5").toDouble
// Serializer for closures and tasks.
val env = SparkEnv.get
@@ -118,7 +120,7 @@ private[spark] class TaskSetManager(
// How frequently to reprint duplicate exceptions in full, in milliseconds
val EXCEPTION_PRINT_INTERVAL =
- System.getProperty("spark.logging.exceptionPrintInterval", "10000").toLong
+ conf.getOrElse("spark.logging.exceptionPrintInterval", "10000").toLong
// Map of recent exceptions (identified by string representation and top stack frame) to
// duplicate count (how many times the same exception has appeared) and time the full exception
@@ -682,14 +684,14 @@ private[spark] class TaskSetManager(
}
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
- val defaultWait = System.getProperty("spark.locality.wait", "3000")
+ val defaultWait = conf.getOrElse("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
- System.getProperty("spark.locality.wait.process", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
- System.getProperty("spark.locality.wait.node", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
- System.getProperty("spark.locality.wait.rack", defaultWait).toLong
+ conf.getOrElse("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0bd1900c/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 9511ccf..7fd8d41 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -25,7 +25,7 @@ import org.apache.spark.streaming._
/**
* This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
- * the jobs and runs them using a thread pool. Number of threads
+ * the jobs and runs them using a thread pool. Number of threads
*/
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {
@@ -33,7 +33,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
initLogging()
val jobSets = new ConcurrentHashMap[Time, JobSet]
- val numConcurrentJobs = System.getProperty("spark.streaming.concurrentJobs", "1").toInt
+ val numConcurrentJobs = ssc.conf.getOrElse("spark.streaming.concurrentJobs", "1").toInt
val executor = Executors.newFixedThreadPool(numConcurrentJobs)
val generator = new JobGenerator(this)
val listenerBus = new StreamingListenerBus()