You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by tg...@apache.org on 2014/07/21 20:15:41 UTC

git commit: SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler

Repository: spark
Updated Branches:
  refs/heads/master cd273a238 -> f89cf65d7


SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler

Author: Sandy Ryza <sa...@cloudera.com>

Closes #634 from sryza/sandy-spark-1707 and squashes the following commits:

2f6e358 [Sandy Ryza] Default min registered executors ratio to .8 for YARN
354c630 [Sandy Ryza] Remove outdated comments
c744ef3 [Sandy Ryza] Take out waitForInitialAllocations
2a4329b [Sandy Ryza] SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f89cf65d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f89cf65d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f89cf65d

Branch: refs/heads/master
Commit: f89cf65d7aced0bb387c05586f9f51cb29865022
Parents: cd273a2
Author: Sandy Ryza <sa...@cloudera.com>
Authored: Mon Jul 21 13:15:46 2014 -0500
Committer: Thomas Graves <tg...@apache.org>
Committed: Mon Jul 21 13:15:46 2014 -0500

----------------------------------------------------------------------
 .../spark/deploy/yarn/ApplicationMaster.scala   | 39 ------------------
 .../cluster/YarnClientClusterScheduler.scala    | 10 -----
 .../cluster/YarnClientSchedulerBackend.scala    |  5 +++
 .../cluster/YarnClusterScheduler.scala          |  8 +---
 .../cluster/YarnClusterSchedulerBackend.scala   |  5 +++
 .../spark/deploy/yarn/ApplicationMaster.scala   | 43 --------------------
 6 files changed, 11 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 062f946..3ec3648 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             sparkContext.getConf)
         }
       }
-    } finally {
-      // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT :
-      // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
   }
 
@@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         }
         yarnAllocator.allocateContainers(
           math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
-        ApplicationMaster.incrementAllocatorLoop(1)
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
       }
-    } finally {
-      // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT,
-      // so that the loop in ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT)
     }
     logInfo("All executors have launched.")
 
@@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 }
 
 object ApplicationMaster extends Logging {
-  // Number of times to wait for the allocator loop to complete.
-  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
-  // This is to ensure that we have reasonable number of containers before we start
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
-  def incrementAllocatorLoop(by: Int) {
-    val count = yarnAllocatorLoop.getAndAdd(by)
-    if (count >= ALLOCATOR_LOOP_WAIT_COUNT) {
-      yarnAllocatorLoop.synchronized {
-        // to wake threads off wait ...
-        yarnAllocatorLoop.notifyAll()
-      }
-    }
-  }
-
   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
 
   def register(master: ApplicationMaster) {
@@ -437,7 +414,6 @@ object ApplicationMaster extends Logging {
 
   val sparkContextRef: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null /* initialValue */)
-  val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0)
 
   def sparkContextInitialized(sc: SparkContext): Boolean = {
     var modified = false
@@ -472,21 +448,6 @@ object ApplicationMaster extends Logging {
     modified
   }
 
-
-  /**
-   * Returns when we've either
-   *  1) received all the requested executors,
-   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
-   *  3) hit an error that causes us to terminate trying to get containers.
-   */
-  def waitForInitialAllocations() {
-    yarnAllocatorLoop.synchronized {
-      while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) {
-        yarnAllocatorLoop.wait(1000L)
-      }
-    }
-  }
-
   def main(argStrings: Array[String]) {
     SignalLogger.register(log)
     val args = new ApplicationMasterArguments(argStrings)

http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 15e8c21..3474112 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur
     val retval = YarnAllocationHandler.lookupRack(conf, host)
     if (retval != null) Some(retval) else None
   }
-
-  override def postStartHook() {
-
-    super.postStartHook()
-    // The yarn application is running, but the executor might not yet ready
-    // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-    // TODO It needn't after waitBackendReady
-    Thread.sleep(2000L)
-    logInfo("YarnClientClusterScheduler.postStartHook done")
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 1b37c4b..d8266f7 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend(
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
   with Logging {
 
+  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+    ready = false
+  }
+
   var client: Client = null
   var appId: ApplicationId = null
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 9ee53d7..9aeca4a 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration)
   }
 
   override def postStartHook() {
-    val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc)
+    ApplicationMaster.sparkContextInitialized(sc)
     super.postStartHook()
-    if (sparkContextInitialized){
-      ApplicationMaster.waitForInitialAllocations()
-      // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt
-      // TODO It needn't after waitBackendReady
-      Thread.sleep(3000L)
-    }
     logInfo("YarnClusterScheduler.postStartHook done")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index a04b08f..0ad1794 100644
--- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend(
     sc: SparkContext)
   extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) {
 
+  if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) {
+    minRegisteredRatio = 0.8
+    ready = false
+  }
+
   override def start() {
     super.start()
     var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS

http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 1a24ec7..eaf594c 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
             sparkContext.getConf)
         }
       }
-    } finally {
-      // In case of exceptions, etc - ensure that the loop in
-      // ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.doneWithInitialAllocations()
     }
   }
 
@@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
         checkNumExecutorsFailed()
         allocateMissingExecutor()
         yarnAllocator.allocateResources()
-        if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) {
-          ApplicationMaster.doneWithInitialAllocations()
-        }
         Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL)
         iters += 1
       }
-    } finally {
-      // In case of exceptions, etc - ensure that the loop in
-      // ApplicationMaster#sparkContextInitialized() breaks.
-      ApplicationMaster.doneWithInitialAllocations()
     }
     logInfo("All executors have launched.")
   }
@@ -365,12 +354,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
 }
 
 object ApplicationMaster extends Logging {
-  // Number of times to wait for the allocator loop to complete.
-  // Each loop iteration waits for 100ms, so maximum of 3 seconds.
-  // This is to ensure that we have reasonable number of containers before we start
   // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be
   // optimal as more containers are available. Might need to handle this better.
-  private val ALLOCATOR_LOOP_WAIT_COUNT = 30
   private val ALLOCATE_HEARTBEAT_INTERVAL = 100
 
   private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]()
@@ -378,20 +363,6 @@ object ApplicationMaster extends Logging {
   val sparkContextRef: AtomicReference[SparkContext] =
     new AtomicReference[SparkContext](null)
 
-  // Variable used to notify the YarnClusterScheduler that it should stop waiting
-  // for the initial set of executors to be started and get on with its business.
-  val doneWithInitialAllocationsMonitor = new Object()
-
-  @volatile var isDoneWithInitialAllocations = false
-
-  def doneWithInitialAllocations() {
-    isDoneWithInitialAllocations = true
-    doneWithInitialAllocationsMonitor.synchronized {
-      // to wake threads off wait ...
-      doneWithInitialAllocationsMonitor.notifyAll()
-    }
-  }
-
   def register(master: ApplicationMaster) {
     applicationMasters.add(master)
   }
@@ -434,20 +405,6 @@ object ApplicationMaster extends Logging {
     modified
   }
 
-  /**
-   * Returns when we've either
-   *  1) received all the requested executors,
-   *  2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms,
-   *  3) hit an error that causes us to terminate trying to get containers.
-   */
-  def waitForInitialAllocations() {
-    doneWithInitialAllocationsMonitor.synchronized {
-      while (!isDoneWithInitialAllocations) {
-        doneWithInitialAllocationsMonitor.wait(1000L)
-      }
-    }
-  }
-
   def getApplicationAttemptId(): ApplicationAttemptId = {
     val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
     val containerId = ConverterUtils.toContainerId(containerIdString)