You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/16 07:19:11 UTC
[spark] branch master updated: [SPARK-35083][CORE] Support remote
scheduler pool files
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 345c380 [SPARK-35083][CORE] Support remote scheduler pool files
345c380 is described below
commit 345c380778ca7ae7f57796e2f2d1744f5310e016
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Apr 16 00:18:35 2021 -0700
[SPARK-35083][CORE] Support remote scheduler pool files
### What changes were proposed in this pull request?
Use hadoop FileSystem instead of FileInputStream.
### Why are the changes needed?
Make `spark.scheduler.allocation.file` suport remote file. When using Spark as a server (e.g. SparkThriftServer), it's hard for user to specify a local path as the scheduler pool.
### Does this PR introduce _any_ user-facing change?
Yes, a minor feature.
### How was this patch tested?
Pass `core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala` and manul test
After add config `spark.scheduler.allocation.file=hdfs:///tmp/fairscheduler.xml`. We intrudoce the configed pool.
![pool1](https://user-images.githubusercontent.com/12025282/114810037-df065700-9ddd-11eb-8d7a-54b59a07ee7b.jpg)
Closes #32184 from ulysses-you/SPARK-35083.
Authored-by: ulysses-you <ul...@gmail.com>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../spark/scheduler/SchedulableBuilder.scala | 13 ++++---
.../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +-
.../org/apache/spark/scheduler/PoolSuite.scala | 45 ++++++++++++++++++----
docs/job-scheduling.md | 3 +-
4 files changed, 48 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 8f6a221..e7c45a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -17,13 +17,15 @@
package org.apache.spark.scheduler
-import java.io.{FileInputStream, InputStream}
+import java.io.InputStream
import java.util.{Locale, NoSuchElementException, Properties}
import scala.util.control.NonFatal
import scala.xml.{Node, XML}
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -54,10 +56,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
}
}
-private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
extends SchedulableBuilder with Logging {
- val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
+ val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
val DEFAULT_POOL_NAME = "default"
@@ -74,7 +76,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
var fileData: Option[(InputStream, String)] = None
try {
fileData = schedulerAllocFile.map { f =>
- val fis = new FileInputStream(f)
+ val filePath = new Path(f)
+ val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 373aab4..ef3a558 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -205,7 +205,7 @@ private[spark] class TaskSchedulerImpl(
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
- new FairSchedulableBuilder(rootPool, conf)
+ new FairSchedulableBuilder(rootPool, sc)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index d9de976..fa2c5ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -20,10 +20,14 @@ package org.apache.spark.scheduler
import java.io.FileNotFoundException
import java.util.Properties
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
import org.apache.spark.resource.ResourceProfile
import org.apache.spark.scheduler.SchedulingMode._
+import org.apache.spark.util.Utils
/**
* Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
@@ -87,7 +91,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
// Ensure that the XML file was read in correctly.
@@ -185,9 +189,10 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
.getFile()
val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
+ sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -239,7 +244,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
// Submit a new task set manager with pool properties set to null. This should result
@@ -267,7 +272,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
val taskScheduler = new TaskSchedulerImpl(sc)
val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
assert(rootPool.getSchedulableByName(TEST_POOL) === null)
@@ -302,7 +307,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -317,7 +322,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
schedulableBuilder.buildPools()
verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -332,12 +337,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(LOCAL, APP_NAME, conf)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
- val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
intercept[FileNotFoundException] {
schedulableBuilder.buildPools()
}
}
+ test("SPARK-35083: Support remote scheduler pool file") {
+ val hadoopVersion = VersionInfo.getVersion.split("\\.")
+ // HttpFileSystem supported since hadoop 2.9
+ assume(hadoopVersion.head.toInt >= 3 ||
+ (hadoopVersion.head.toInt == 2 && hadoopVersion(1).toInt >= 9))
+
+ val xmlPath = new Path(
+ Utils.getSparkClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile)
+ TestUtils.withHttpServer(xmlPath.getParent.toUri.getPath) { baseURL =>
+ val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE,
+ baseURL + "fairscheduler-with-valid-data.xml")
+ sc = new SparkContext(LOCAL, APP_NAME, conf)
+
+ val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+ val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
+ schedulableBuilder.buildPools()
+
+ verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
+ verifyPool(rootPool, "pool1", 3, 1, FIFO)
+ verifyPool(rootPool, "pool2", 4, 2, FAIR)
+ verifyPool(rootPool, "pool3", 2, 3, FAIR)
+ }
+ }
+
private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
val selectedPool = rootPool.getSchedulableByName(poolName)
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index f2b77cd..51060dd 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -252,10 +252,11 @@ properties:
The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`,
and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your
-[SparkConf](configuration.html#spark-properties).
+[SparkConf](configuration.html#spark-properties). The file path can either be a local file path or HDFS file path.
{% highlight scala %}
conf.set("spark.scheduler.allocation.file", "/path/to/file")
+conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
{% endhighlight %}
The format of the XML file is simply a `<pool>` element for each pool, with different elements
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org