You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2017/02/10 16:33:03 UTC

spark git commit: [SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Logging

Repository: spark
Updated Branches:
  refs/heads/master c5a66356d -> dadff5f07


[SPARK-19466][CORE][SCHEDULER] Improve Fair Scheduler Logging

Fair Scheduler Logging for the following cases can be useful for the user.

1. If **valid** `spark.scheduler.allocation.file` property is set, user can be informed and aware which scheduler file is processed when `SparkContext` initializes.

2. If **invalid** `spark.scheduler.allocation.file` property is set, currently, the following stacktrace is shown to user. In addition to this, more meaningful message can be shown to user by emphasizing the problem at building level of fair scheduler. Also other potential issues can be covered at this level as **Fair Scheduler can not be built. + exception stacktrace**
```
Exception in thread "main" java.io.FileNotFoundException: INVALID_FILE (No such file or directory)
	at java.io.FileInputStream.open0(Native Method)
	at java.io.FileInputStream.open(FileInputStream.java:195)
	at java.io.FileInputStream.<init>(FileInputStream.java:138)
	at java.io.FileInputStream.<init>(FileInputStream.java:93)
	at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:76)
	at org.apache.spark.scheduler.FairSchedulableBuilder$$anonfun$buildPools$1.apply(SchedulableBuilder.scala:75)
```
3. If `spark.scheduler.allocation.file` property is not set and **default** fair scheduler file (**fairscheduler.xml**) is found in classpath, it will be loaded but currently, user is not informed for using default file so logging can be useful as **Fair Scheduler file: fairscheduler.xml is found successfully and will be parsed.**

4. If **spark.scheduler.allocation.file** property is not set and **default** fair scheduler file does not exist in classpath, currently, user is not informed so logging can be useful as **No Fair Scheduler file found.**

Also this PR is related with https://github.com/apache/spark/pull/15237 to emphasize fileName in warning logs when fair scheduler file has invalid minShare, weight or schedulingMode values.

## How was this patch tested?
Added new Unit Tests.

Author: erenavsarogullari <er...@gmail.com>

Closes #16813 from erenavsarogullari/SPARK-19466.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dadff5f0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dadff5f0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dadff5f0

Branch: refs/heads/master
Commit: dadff5f0789cce7cf3728a8adaab42118e5dc019
Parents: c5a6635
Author: Eren Avsarogullari <er...@gmail.com>
Authored: Fri Feb 10 08:32:36 2017 -0800
Committer: Kay Ousterhout <ka...@gmail.com>
Committed: Fri Feb 10 08:32:36 2017 -0800

----------------------------------------------------------------------
 .../spark/scheduler/SchedulableBuilder.scala    | 67 ++++++++++++++------
 1 file changed, 46 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dadff5f0/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
index f8bee3e..e53c4fb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulableBuilder.scala
@@ -20,6 +20,7 @@ package org.apache.spark.scheduler
 import java.io.{FileInputStream, InputStream}
 import java.util.{NoSuchElementException, Properties}
 
+import scala.util.control.NonFatal
 import scala.xml.{Node, XML}
 
 import org.apache.spark.SparkConf
@@ -55,7 +56,8 @@ private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
 private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   extends SchedulableBuilder with Logging {
 
-  val schedulerAllocFile = conf.getOption("spark.scheduler.allocation.file")
+  val SCHEDULER_ALLOCATION_FILE_PROPERTY = "spark.scheduler.allocation.file"
+  val schedulerAllocFile = conf.getOption(SCHEDULER_ALLOCATION_FILE_PROPERTY)
   val DEFAULT_SCHEDULER_FILE = "fairscheduler.xml"
   val FAIR_SCHEDULER_PROPERTIES = "spark.scheduler.pool"
   val DEFAULT_POOL_NAME = "default"
@@ -69,19 +71,35 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   val DEFAULT_WEIGHT = 1
 
   override def buildPools() {
-    var is: Option[InputStream] = None
+    var fileData: Option[(InputStream, String)] = None
     try {
-      is = Option {
-        schedulerAllocFile.map { f =>
-          new FileInputStream(f)
-        }.getOrElse {
-          Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+      fileData = schedulerAllocFile.map { f =>
+        val fis = new FileInputStream(f)
+        logInfo(s"Creating Fair Scheduler pools from $f")
+        Some((fis, f))
+      }.getOrElse {
+        val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
+        if (is != null) {
+          logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
+          Some((is, DEFAULT_SCHEDULER_FILE))
+        } else {
+          logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
+            s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
+            s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
+          None
         }
       }
 
-      is.foreach { i => buildFairSchedulerPool(i) }
+      fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
+    } catch {
+      case NonFatal(t) =>
+        val defaultMessage = "Error while building the fair scheduler pools"
+        val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
+          .getOrElse(defaultMessage)
+        logError(message, t)
+        throw t
     } finally {
-      is.foreach(_.close())
+      fileData.foreach { case (is, fileName) => is.close() }
     }
 
     // finally create "default" pool
@@ -93,24 +111,27 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
       val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
         DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
       rootPool.addSchedulable(pool)
-      logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+      logInfo("Created default pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
         DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
     }
   }
 
-  private def buildFairSchedulerPool(is: InputStream) {
+  private def buildFairSchedulerPool(is: InputStream, fileName: String) {
     val xml = XML.load(is)
     for (poolNode <- (xml \\ POOLS_PROPERTY)) {
 
       val poolName = (poolNode \ POOL_NAME_PROPERTY).text
 
-      val schedulingMode = getSchedulingModeValue(poolNode, poolName, DEFAULT_SCHEDULING_MODE)
-      val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY, DEFAULT_MINIMUM_SHARE)
-      val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY, DEFAULT_WEIGHT)
+      val schedulingMode = getSchedulingModeValue(poolNode, poolName,
+        DEFAULT_SCHEDULING_MODE, fileName)
+      val minShare = getIntValue(poolNode, poolName, MINIMUM_SHARES_PROPERTY,
+        DEFAULT_MINIMUM_SHARE, fileName)
+      val weight = getIntValue(poolNode, poolName, WEIGHT_PROPERTY,
+        DEFAULT_WEIGHT, fileName)
 
       rootPool.addSchedulable(new Pool(poolName, schedulingMode, minShare, weight))
 
-      logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+      logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
         poolName, schedulingMode, minShare, weight))
     }
   }
@@ -118,11 +139,13 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   private def getSchedulingModeValue(
       poolNode: Node,
       poolName: String,
-      defaultValue: SchedulingMode): SchedulingMode = {
+      defaultValue: SchedulingMode,
+      fileName: String): SchedulingMode = {
 
     val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text.trim.toUpperCase
-    val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode, using the default " +
-      s"schedulingMode: $defaultValue for pool: $poolName"
+    val warningMessage = s"Unsupported schedulingMode: $xmlSchedulingMode found in " +
+      s"Fair Scheduler configuration file: $fileName, using " +
+      s"the default schedulingMode: $defaultValue for pool: $poolName"
     try {
       if (SchedulingMode.withName(xmlSchedulingMode) != SchedulingMode.NONE) {
         SchedulingMode.withName(xmlSchedulingMode)
@@ -140,14 +163,16 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
   private def getIntValue(
       poolNode: Node,
       poolName: String,
-      propertyName: String, defaultValue: Int): Int = {
+      propertyName: String,
+      defaultValue: Int,
+      fileName: String): Int = {
 
     val data = (poolNode \ propertyName).text.trim
     try {
       data.toInt
     } catch {
       case e: NumberFormatException =>
-        logWarning(s"Error while loading scheduler allocation file. " +
+        logWarning(s"Error while loading fair scheduler configuration from $fileName: " +
           s"$propertyName is blank or invalid: $data, using the default $propertyName: " +
           s"$defaultValue for pool: $poolName")
         defaultValue
@@ -166,7 +191,7 @@ private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)
         parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE,
           DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
         rootPool.addSchedulable(parentPool)
-        logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
+        logInfo("Created pool: %s, schedulingMode: %s, minShare: %d, weight: %d".format(
           poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org