You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ir...@apache.org on 2017/02/06 14:24:36 UTC
spark git commit: [SPARK-17663][CORE] SchedulableBuilder should
handle invalid data access via scheduler.allocation.file
Repository: spark
Updated Branches:
refs/heads/master 7730426cb -> 7beb227cc
[SPARK-17663][CORE] SchedulableBuilder should handle invalid data access via scheduler.allocation.file
## What changes were proposed in this pull request?
If `spark.scheduler.allocation.file` has invalid `minShare` or/and `weight` values, these cause :
- `NumberFormatException` due to `toInt` function
- `SparkContext` can not be initialized.
- It does not show meaningful error message to user.
In a nutshell, this functionality can be more robust by selecting one of the following flows :
**1-** Currently, if `schedulingMode` has an invalid value, a warning message is logged and default value is set as `FIFO`. Same pattern can be used for `minShare`(default: 0) and `weight`(default: 1) as well
**2-** Meaningful error message can be shown to the user for all invalid cases.
PR offers :
- `schedulingMode` handles just empty values. It also needs to be supported for **whitespace**, **non-uppercase**(fair, FaIr etc...) or `SchedulingMode.NONE` cases by setting default value(`FIFO`)
- `minShare` and `weight` handle just empty values. They also need to be supported for **non-integer** cases by setting default values.
- Some refactoring of `PoolSuite`.
**Code to Reproduce :**
```
val conf = new SparkConf().setAppName("spark-fairscheduler").setMaster("local")
conf.set("spark.scheduler.mode", "FAIR")
conf.set("spark.scheduler.allocation.file", "src/main/resources/fairscheduler-invalid-data.xml")
val sc = new SparkContext(conf)
```
**fairscheduler-invalid-data.xml :**
```
<allocations>
<pool name="production">
<schedulingMode>FIFO</schedulingMode>
<weight>invalid_weight</weight>
<minShare>2</minShare>
</pool>
</allocations>
```
**Stacktrace :**
```
Exception in thread "main" java.lang.NumberFormatException: For input string: "invalid_weight"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:272)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:29)
at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:127)
at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$org$apache$spark$scheduler$FairSchedulableBuilder$$buildFairSchedulerPool$1.apply(SchedulableBuilder.scala:102)
```
## How was this patch tested?
Added Unit Test Case.
Author: erenavsarogullari <er...@gmail.com>
Closes #15237 from erenavsarogullari/SPARK-17663.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7beb227c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7beb227c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7beb227c
Branch: refs/heads/master
Commit: 7beb227cc8a4674e24cb1aaa278287ecc8194e5d
Parents: 7730426
Author: erenavsarogullari <er...@gmail.com>
Authored: Mon Feb 6 08:24:17 2017 -0600
Committer: Imran Rashid <ir...@cloudera.com>
Committed: Mon Feb 6 08:24:17 2017 -0600
----------------------------------------------------------------------
.../spark/scheduler/SchedulableBuilder.scala | 70 +++++++++++------
.../fairscheduler-with-invalid-data.xml | 80 +++++++++++++++++++
.../org/apache/spark/scheduler/PoolSuite.scala | 83 ++++++++++++++------
3 files changed, 182 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7beb227c/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 96325a0..f8bee3e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,10 +20,11 @@ package org.apache.spark.scheduler
import java.io.{FileInputStream, InputStream}
import java.util.{NoSuchElementException, Properties}
-import scala.xml.XML
+import scala.xml.{Node, XML}
import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.util.Utils
/**
@@ -102,38 +103,57 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
- var schedulingMode = DEFAULT_SCHEDULING_MODE
- var minShare = DEFAULT_MINIMUM_SHARE
- var weight = DEFAULT_WEIGHT
-
- val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
- if (xmlSchedulingMode != "") {
- try {
- schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
- } catch {
- case e: NoSuchElementException =>
- logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
- s"using the default schedulingMode: $schedulingMode")
- }
- }
- val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
- if (xmlMinShare != "") {
- minShare = xmlMinShare.toInt
- }
+ val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
+ val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
+ val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
- val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
- if (xmlWeight != "") {
- weight = xmlWeight.toInt
- }
+ rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
- val pool = new Pool(poolName, schedulingMode, minShare, weight)
- rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
+ private def getSchedulingModeValue(
+ poolNode: Node,
+ poolName: String,
+ defaultValue: SchedulingMode): SchedulingMode = {
+
+ val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
+ val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
+ s"schedulingMode: $defaultValue for pool: $poolName"
+ try {
+ if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
+ SchedulingMode.withName(xmlSchedulingMode)
+ } else {
+ logWarning(warningMessage)
+ defaultValue
+ }
+ } catch {
+ case e: NoSuchElementException =>
+ logWarning(warningMessage)
+ defaultValue
+ }
+ }
+
+ private def getIntValue(
+ poolNode: Node,
+ poolName: String,
+ propertyName: String, defaultValue: Int): Int = {
+
+ val data = (poolNode \ propertyName).text.trim
+ try {
+ data.toInt
+ } catch {
+ case e: NumberFormatException =>
+ logWarning(s"Error while loading scheduler allocation file. " +
+ s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
+ s"$defaultValue for pool: $poolName")
+ defaultValue
+ }
+ }
+
override def addTaskSetManager(manager: Schedulable, properties: Properties) {
var poolName = DEFAULT_POOL_NAME
var parentPool = rootPool.getSchedulableByName(poolName)
http://git-wip-us.apache.org/repos/asf/spark/blob/7beb227c/core/src/test/resources/fairscheduler-with-invalid-data.xml
----------------------------------------------------------------------
diff --git a/core/src/test/resources/fairscheduler-with-invalid-data.xml b/core/src/test/resources/fairscheduler-with-invalid-data.xml
new file mode 100644
index 0000000..a4d8d07
--- /dev/null
+++ b/core/src/test/resources/fairscheduler-with-invalid-data.xml
@@ -0,0 +1,80 @@
+<?xml version="1.0"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<allocations>
+ <pool name="pool_with_invalid_min_share">
+ <minShare>INVALID_MIN_SHARE</minShare>
+ <weight>2</weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_invalid_weight">
+ <minShare>1</minShare>
+ <weight>INVALID_WEIGHT</weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_invalid_scheduling_mode">
+ <minShare>3</minShare>
+ <weight>2</weight>
+ <schedulingMode>INVALID_SCHEDULING_MODE</schedulingMode>
+ </pool>
+ <pool name="pool_with_non_uppercase_scheduling_mode">
+ <minShare>2</minShare>
+ <weight>1</weight>
+ <schedulingMode>fair</schedulingMode>
+ </pool>
+ <pool name="pool_with_NONE_scheduling_mode">
+ <minShare>1</minShare>
+ <weight>2</weight>
+ <schedulingMode>NONE</schedulingMode>
+ </pool>
+ <pool name="pool_with_whitespace_min_share">
+ <minShare> </minShare>
+ <weight>2</weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_whitespace_weight">
+ <minShare>1</minShare>
+ <weight> </weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_whitespace_scheduling_mode">
+ <minShare>3</minShare>
+ <weight>2</weight>
+ <schedulingMode> </schedulingMode>
+ </pool>
+ <pool name="pool_with_empty_min_share">
+ <minShare></minShare>
+ <weight>3</weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_empty_weight">
+ <minShare>2</minShare>
+ <weight></weight>
+ <schedulingMode>FAIR</schedulingMode>
+ </pool>
+ <pool name="pool_with_empty_scheduling_mode">
+ <minShare>2</minShare>
+ <weight>2</weight>
+ <schedulingMode></schedulingMode>
+ </pool>
+ <pool name="pool_with_surrounded_whitespace">
+ <minShare> 3 </minShare>
+ <weight> 2 </weight>
+ <schedulingMode> FAIR </schedulingMode>
+ </pool>
+</allocations>
http://git-wip-us.apache.org/repos/asf/spark/blob/7beb227c/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index 00e1c44..520736a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
import java.util.Properties
import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.spark.scheduler.SchedulingMode._
/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
@@ -27,6 +28,10 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui
*/
class PoolSuite extends SparkFunSuite with LocalSparkContext {
+ val LOCAL = "local"
+ val APP_NAME = "PoolSuite"
+ val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
+
def createTaskSetManager(stageId: Int, numTasks: Int, taskScheduler: TaskSchedulerImpl)
: TaskSetManager = {
val tasks = Array.tabulate[Task[_]](numTasks) { i =>
@@ -45,12 +50,11 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}
test("FIFO Scheduler Test") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ sc = new SparkContext(LOCAL, APP_NAME)
val taskScheduler = new TaskSchedulerImpl(sc)
- val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
+ val rootPool = new Pool("", FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
- schedulableBuilder.buildPools()
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler)
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler)
@@ -74,30 +78,24 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
*/
test("Fair Scheduler Test") {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
- val conf = new SparkConf().set("spark.scheduler.allocation.file", xmlPath)
- sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
+ val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
+ sc = new SparkContext(LOCAL, APP_NAME, conf)
val taskScheduler = new TaskSchedulerImpl(sc)
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val rootPool = new Pool("", FAIR, 0, 0)
val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
schedulableBuilder.buildPools()
// Ensure that the XML file was read in correctly.
- assert(rootPool.getSchedulableByName("default") != null)
- assert(rootPool.getSchedulableByName("1") != null)
- assert(rootPool.getSchedulableByName("2") != null)
- assert(rootPool.getSchedulableByName("3") != null)
- assert(rootPool.getSchedulableByName("1").minShare === 2)
- assert(rootPool.getSchedulableByName("1").weight === 1)
- assert(rootPool.getSchedulableByName("2").minShare === 3)
- assert(rootPool.getSchedulableByName("2").weight === 1)
- assert(rootPool.getSchedulableByName("3").minShare === 0)
- assert(rootPool.getSchedulableByName("3").weight === 1)
+ verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
+ verifyPool(rootPool, "1", 2, 1, FIFO)
+ verifyPool(rootPool, "2", 3, 1, FIFO)
+ verifyPool(rootPool, "3", 0, 1, FIFO)
val properties1 = new Properties()
- properties1.setProperty("spark.scheduler.pool", "1")
+ properties1.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "1")
val properties2 = new Properties()
- properties2.setProperty("spark.scheduler.pool", "2")
+ properties2.setProperty(schedulableBuilder.FAIR_SCHEDULER_PROPERTIES, "2")
val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler)
val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler)
@@ -134,22 +132,22 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
}
test("Nested Pool Test") {
- sc = new SparkContext("local", "TaskSchedulerImplSuite")
+ sc = new SparkContext(LOCAL, APP_NAME)
val taskScheduler = new TaskSchedulerImpl(sc)
- val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
- val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
+ val rootPool = new Pool("", FAIR, 0, 0)
+ val pool0 = new Pool("0", FAIR, 3, 1)
+ val pool1 = new Pool("1", FAIR, 4, 1)
rootPool.addSchedulable(pool0)
rootPool.addSchedulable(pool1)
- val pool00 = new Pool("00", SchedulingMode.FAIR, 2, 2)
- val pool01 = new Pool("01", SchedulingMode.FAIR, 1, 1)
+ val pool00 = new Pool("00", FAIR, 2, 2)
+ val pool01 = new Pool("01", FAIR, 1, 1)
pool0.addSchedulable(pool00)
pool0.addSchedulable(pool01)
- val pool10 = new Pool("10", SchedulingMode.FAIR, 2, 2)
- val pool11 = new Pool("11", SchedulingMode.FAIR, 2, 1)
+ val pool10 = new Pool("10", FAIR, 2, 2)
+ val pool11 = new Pool("11", FAIR, 2, 1)
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
@@ -178,4 +176,37 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
scheduleTaskAndVerifyId(2, rootPool, 6)
scheduleTaskAndVerifyId(3, rootPool, 2)
}
+
+ test("SPARK-17663: FairSchedulableBuilder sets default values for blank or invalid datas") {
+ val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
+ .getFile()
+ val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE_PROPERTY, xmlPath)
+
+ val rootPool = new Pool("", FAIR, 0, 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
+ schedulableBuilder.buildPools()
+
+ verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
+ verifyPool(rootPool, "pool_with_invalid_min_share", 0, 2, FAIR)
+ verifyPool(rootPool, "pool_with_invalid_weight", 1, 1, FAIR)
+ verifyPool(rootPool, "pool_with_invalid_scheduling_mode", 3, 2, FIFO)
+ verifyPool(rootPool, "pool_with_non_uppercase_scheduling_mode", 2, 1, FAIR)
+ verifyPool(rootPool, "pool_with_NONE_scheduling_mode", 1, 2, FIFO)
+ verifyPool(rootPool, "pool_with_whitespace_min_share", 0, 2, FAIR)
+ verifyPool(rootPool, "pool_with_whitespace_weight", 1, 1, FAIR)
+ verifyPool(rootPool, "pool_with_whitespace_scheduling_mode", 3, 2, FIFO)
+ verifyPool(rootPool, "pool_with_empty_min_share", 0, 3, FAIR)
+ verifyPool(rootPool, "pool_with_empty_weight", 2, 1, FAIR)
+ verifyPool(rootPool, "pool_with_empty_scheduling_mode", 2, 2, FIFO)
+ verifyPool(rootPool, "pool_with_surrounded_whitespace", 3, 2, FAIR)
+ }
+
+ private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
+ expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
+ assert(rootPool.getSchedulableByName(poolName) != null)
+ assert(rootPool.getSchedulableByName(poolName).minShare === expectedInitMinShare)
+ assert(rootPool.getSchedulableByName(poolName).weight === expectedInitWeight)
+ assert(rootPool.getSchedulableByName(poolName).schedulingMode === expectedSchedulingMode)
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org