You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/04/16 07:19:11 UTC

[spark] branch master updated: [SPARK-35083][CORE] Support remote scheduler pool files

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 345c380  [SPARK-35083][CORE] Support remote scheduler pool files
345c380 is described below

commit 345c380778ca7ae7f57796e2f2d1744f5310e016
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Fri Apr 16 00:18:35 2021 -0700

    [SPARK-35083][CORE] Support remote scheduler pool files
    
    ### What changes were proposed in this pull request?
    
    Use hadoop FileSystem instead of FileInputStream.
    
    ### Why are the changes needed?
    
    Make `spark.scheduler.allocation.file` suport remote file. When using Spark as a server (e.g. SparkThriftServer), it's hard for user to specify a local path as the scheduler pool.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, a minor feature.
    
    ### How was this patch tested?
    
    Pass `core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala` and manul test
    After add config `spark.scheduler.allocation.file=hdfs:///tmp/fairscheduler.xml`. We intrudoce the configed pool.
    ![pool1](https://user-images.githubusercontent.com/12025282/114810037-df065700-9ddd-11eb-8d7a-54b59a07ee7b.jpg)
    
    Closes #32184 from ulysses-you/SPARK-35083.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/scheduler/SchedulableBuilder.scala       | 13 ++++---
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../org/apache/spark/scheduler/PoolSuite.scala     | 45 ++++++++++++++++++----
 docs/job-scheduling.md                             |  3 +-
 4 files changed, 48 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index 8f6a221..e7c45a9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -17,13 +17,15 @@
 
 package org.apache.spark.scheduler
 
-import java.io.{FileInputStream, InputStream}
+import java.io.InputStream
 import java.util.{Locale, NoSuchElementException, Properties}
 
 import scala.util.control.NonFatal
 import scala.xml.{Node, XML}
 
-import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
 import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
@@ -54,10 +56,10 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
   }
 }
 
-private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
+private[spark] class FairSchedulableBuilder(val rootPool: Pool, sc: SparkContext)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = conf.get(SCHEDULER_ALLOCATION_FILE)
+  val schedulerAllocFile = sc.conf.get(SCHEDULER_ALLOCATION_FILE)
   val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = SparkContext.SPARK_SCHEDULER_POOL
   val DEFAULT_POOL_NAME = "default"
@@ -74,7 +76,8 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
     var fileData: Option[(InputStream, String)] = None
     try {
       fileData = schedulerAllocFile.map { f =>
-        val fis = new FileInputStream(f)
+        val filePath = new Path(f)
+        val fis = filePath.getFileSystem(sc.hadoopConfiguration).open(filePath)
         logInfo(s"Creating Fair Scheduler pools from $f")
         Some((fis, f))
       }.getOrElse {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 373aab4..ef3a558 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -205,7 +205,7 @@ private[spark] class TaskSchedulerImpl(
         case SchedulingMode.FIFO =>
           new FIFOSchedulableBuilder(rootPool)
         case SchedulingMode.FAIR =>
-          new FairSchedulableBuilder(rootPool, conf)
+          new FairSchedulableBuilder(rootPool, sc)
         case _ =>
           throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
           s"$schedulingMode")
diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
index d9de976..fa2c5ea 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala
@@ -20,10 +20,14 @@ package org.apache.spark.scheduler
 import java.io.FileNotFoundException
 import java.util.Properties
 
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.util.VersionInfo
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite, TestUtils}
 import org.apache.spark.internal.config.SCHEDULER_ALLOCATION_FILE
 import org.apache.spark.resource.ResourceProfile
 import org.apache.spark.scheduler.SchedulingMode._
+import org.apache.spark.util.Utils
 
 /**
  * Tests that pools and the associated scheduling algorithms for FIFO and fair scheduling work
@@ -87,7 +91,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     val taskScheduler = new TaskSchedulerImpl(sc)
 
     val rootPool = new Pool("", FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     // Ensure that the XML file was read in correctly.
@@ -185,9 +189,10 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     val xmlPath = getClass.getClassLoader.getResource("fairscheduler-with-invalid-data.xml")
       .getFile()
     val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE, xmlPath)
+    sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -239,7 +244,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     val taskScheduler = new TaskSchedulerImpl(sc)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     // Submit a new task set manager with pool properties set to null. This should result
@@ -267,7 +272,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     val taskScheduler = new TaskSchedulerImpl(sc)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, initMinShare = 0, initWeight = 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     assert(rootPool.getSchedulableByName(TEST_POOL) === null)
@@ -302,7 +307,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -317,7 +322,7 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     schedulableBuilder.buildPools()
 
     verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
@@ -332,12 +337,36 @@ class PoolSuite extends SparkFunSuite with LocalSparkContext {
     sc = new SparkContext(LOCAL, APP_NAME, conf)
 
     val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
-    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc.conf)
+    val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
     intercept[FileNotFoundException] {
       schedulableBuilder.buildPools()
     }
   }
 
+  test("SPARK-35083: Support remote scheduler pool file") {
+    val hadoopVersion = VersionInfo.getVersion.split("\\.")
+    // HttpFileSystem supported since hadoop 2.9
+    assume(hadoopVersion.head.toInt >= 3 ||
+      (hadoopVersion.head.toInt == 2 && hadoopVersion(1).toInt >= 9))
+
+    val xmlPath = new Path(
+      Utils.getSparkClassLoader.getResource("fairscheduler-with-valid-data.xml").getFile)
+    TestUtils.withHttpServer(xmlPath.getParent.toUri.getPath) { baseURL =>
+      val conf = new SparkConf().set(SCHEDULER_ALLOCATION_FILE,
+        baseURL + "fairscheduler-with-valid-data.xml")
+      sc = new SparkContext(LOCAL, APP_NAME, conf)
+
+      val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
+      val schedulableBuilder = new FairSchedulableBuilder(rootPool, sc)
+      schedulableBuilder.buildPools()
+
+      verifyPool(rootPool, schedulableBuilder.DEFAULT_POOL_NAME, 0, 1, FIFO)
+      verifyPool(rootPool, "pool1", 3, 1, FIFO)
+      verifyPool(rootPool, "pool2", 4, 2, FAIR)
+      verifyPool(rootPool, "pool3", 2, 3, FAIR)
+    }
+  }
+
   private def verifyPool(rootPool: Pool, poolName: String, expectedInitMinShare: Int,
                          expectedInitWeight: Int, expectedSchedulingMode: SchedulingMode): Unit = {
     val selectedPool = rootPool.getSchedulableByName(poolName)
diff --git a/docs/job-scheduling.md b/docs/job-scheduling.md
index f2b77cd..51060dd 100644
--- a/docs/job-scheduling.md
+++ b/docs/job-scheduling.md
@@ -252,10 +252,11 @@ properties:
 
 The pool properties can be set by creating an XML file, similar to `conf/fairscheduler.xml.template`,
 and either putting a file named `fairscheduler.xml` on the classpath, or setting `spark.scheduler.allocation.file` property in your
-[SparkConf](configuration.html#spark-properties).
+[SparkConf](configuration.html#spark-properties). The file path can either be a local file path or HDFS file path.
 
 {% highlight scala %}
 conf.set("spark.scheduler.allocation.file", "/path/to/file")
+conf.set("spark.scheduler.allocation.file", "hdfs:///path/to/file")
 {% endhighlight %}
 
 The format of the XML file is simply a `<pool>` element for each pool, with different elements

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org