You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/09/29 20:53:32 UTC

spark git commit: [SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite

Repository: spark
Updated Branches:
  refs/heads/master 9b9fe5f7b -> dba95ea03


[SPARK-10825] [CORE] [TESTS] Fix race conditions in StandaloneDynamicAllocationSuite

Fix the following issues in StandaloneDynamicAllocationSuite:

1. It should not assume master and workers start in order
2. It should not assume master and workers get ready at once
3. It should not assume the application is already registered with master after creating SparkContext
4. It should not access Master.app and idToApp which are not thread safe

The changes includes:
* Use `eventually` to wait until master and workers are ready to fix 1 and 2
* Use `eventually`  to wait until the application is registered with master to fix 3
* Use `askWithRetry[MasterStateResponse](RequestMasterState)` to get the application info to fix 4

Author: zsxwing <zs...@gmail.com>

Closes #8914 from zsxwing/fix-StandaloneDynamicAllocationSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba95ea0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba95ea0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba95ea0

Branch: refs/heads/master
Commit: dba95ea03216e6b8e623db4a36e1018c6ed95538
Parents: 9b9fe5f
Author: zsxwing <zs...@gmail.com>
Authored: Tue Sep 29 11:53:28 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Sep 29 11:53:28 2015 -0700

----------------------------------------------------------------------
 .../StandaloneDynamicAllocationSuite.scala      | 305 ++++++++++++-------
 1 file changed, 192 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba95ea0/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 1f2a0f0..2e2fa22 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.deploy
 
+import scala.concurrent.duration._
+
 import org.mockito.Mockito.{mock, when}
 import org.scalatest.BeforeAndAfterAll
+import org.scalatest.concurrent.Eventually._
 
 import org.apache.spark._
+import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
+import org.apache.spark.deploy.master.ApplicationInfo
 import org.apache.spark.deploy.master.Master
 import org.apache.spark.deploy.worker.Worker
 import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv}
@@ -56,6 +61,10 @@ class StandaloneDynamicAllocationSuite
     }
     master = makeMaster()
     workers = makeWorkers(10, 2048)
+    // Wait until all workers register with master successfully
+    eventually(timeout(60.seconds), interval(10.millis)) {
+      assert(getMasterState.workers.size === numWorkers)
+    }
   }
 
   override def afterAll(): Unit = {
@@ -73,167 +82,208 @@ class StandaloneDynamicAllocationSuite
   test("dynamic allocation default behavior") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one won't go through
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 2 = 1 executor
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with max cores <= cores per worker") {
     sc = new SparkContext(appConf.set("spark.cores.max", "8"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 8)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 8)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more; this one won't go through because we're already at max cores.
     // This highlights a limitation of using dynamic allocation with max cores WITHOUT
     // setting cores per executor: once an application scales down and then scales back
     // up, its executors may not be spread out anymore!
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one also won't go through for the same reason
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 1 = 2 executor
     // Note: we scheduled these executors together, so their cores should be evenly distributed
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(apps.head.getExecutorLimit === 2)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(4, 4))
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with max cores > cores per worker") {
     sc = new SparkContext(appConf.set("spark.cores.max", "16"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 10)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 10)
+    assert(apps.head.getExecutorLimit === 1)
     // request 1 more
     // Note: the cores are not evenly distributed because we scheduled these executors 1 by 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toSet === Set(10, 6))
+    assert(apps.head.getExecutorLimit === 2)
     // request 1 more; this one won't go through
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 3)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 3)
     // kill all existing executors; we should end up with 3 - 2 = 1 executor
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.executors.values.head.cores === 10)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.executors.values.head.cores === 10)
+    assert(apps.head.getExecutorLimit === 1)
     // kill all executors again; this time we'll have 1 - 1 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.executors.values.map(_.cores).toArray === Array(8, 8))
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with cores per executor") {
     sc = new SparkContext(appConf.set("spark.executor.cores", "2"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 10) // 20 cores total
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 10) // 20 cores total
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 3 more
     assert(sc.requestExecutors(3))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 4)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 4)
     // request 10 more; only 6 will go through
     assert(sc.requestExecutors(10))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 14)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 14)
     // kill 2 executors; we should get 2 back immediately
     assert(killNExecutors(sc, 2))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 12)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 12)
     // kill 4 executors; we should end up with 12 - 4 = 8 executors
     assert(killNExecutors(sc, 4))
-    assert(master.apps.head.executors.size === 8)
-    assert(master.apps.head.getExecutorLimit === 8)
+    apps = getApplications()
+    assert(apps.head.executors.size === 8)
+    assert(apps.head.getExecutorLimit === 8)
     // kill all executors; this time we'll have 8 - 8 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 10)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 10)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("dynamic allocation with cores per executor AND max cores") {
@@ -241,55 +291,70 @@ class StandaloneDynamicAllocationSuite
       .set("spark.executor.cores", "2")
       .set("spark.cores.max", "8"))
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 4) // 8 cores total
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 4) // 8 cores total
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // kill all executors
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    var apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request 1
     assert(sc.requestExecutors(1))
-    assert(master.apps.head.executors.size === 1)
-    assert(master.apps.head.getExecutorLimit === 1)
+    apps = getApplications()
+    assert(apps.head.executors.size === 1)
+    assert(apps.head.getExecutorLimit === 1)
     // request 3 more
     assert(sc.requestExecutors(3))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 4)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 4)
     // request 10 more; none will go through
     assert(sc.requestExecutors(10))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 14)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 14)
     // kill all executors; 4 executors will be launched immediately
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 10)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 10)
     // ... and again
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 6)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 6)
     // ... and again; now we end up with 6 - 4 = 2 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === 2)
+    apps = getApplications()
+    assert(apps.head.executors.size === 2)
+    assert(apps.head.getExecutorLimit === 2)
     // ... and again; this time we have 2 - 2 = 0 executors left
     assert(killAllExecutors(sc))
-    assert(master.apps.head.executors.size === 0)
-    assert(master.apps.head.getExecutorLimit === 0)
+    apps = getApplications()
+    assert(apps.head.executors.size === 0)
+    assert(apps.head.getExecutorLimit === 0)
     // request many more; this increases the limit well beyond the cluster capacity
     assert(sc.requestExecutors(1000))
-    assert(master.apps.head.executors.size === 4)
-    assert(master.apps.head.getExecutorLimit === 1000)
+    apps = getApplications()
+    assert(apps.head.executors.size === 4)
+    assert(apps.head.getExecutorLimit === 1000)
   }
 
   test("kill the same executor twice (SPARK-9795)") {
     sc = new SparkContext(appConf)
     val appId = sc.applicationId
-    assert(master.apps.size === 1)
-    assert(master.apps.head.id === appId)
-    assert(master.apps.head.executors.size === 2)
-    assert(master.apps.head.getExecutorLimit === Int.MaxValue)
+    eventually(timeout(10.seconds), interval(10.millis)) {
+      val apps = getApplications()
+      assert(apps.size === 1)
+      assert(apps.head.id === appId)
+      assert(apps.head.executors.size === 2)
+      assert(apps.head.getExecutorLimit === Int.MaxValue)
+    }
     // sync executors between the Master and the driver, needed because
     // the driver refuses to kill executors it does not know about
     syncExecutors(sc)
@@ -298,9 +363,10 @@ class StandaloneDynamicAllocationSuite
     assert(executors.size === 2)
     assert(sc.killExecutor(executors.head))
     assert(sc.killExecutor(executors.head))
-    assert(master.apps.head.executors.size === 1)
+    val apps = getApplications()
+    assert(apps.head.executors.size === 1)
     // The limit should not be lowered twice
-    assert(master.apps.head.getExecutorLimit === 1)
+    assert(apps.head.getExecutorLimit === 1)
   }
 
   // ===============================
@@ -333,6 +399,16 @@ class StandaloneDynamicAllocationSuite
     }
   }
 
+  /** Get the Master state */
+  private def getMasterState: MasterStateResponse = {
+    master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+  }
+
+  /** Get the applictions that are active from Master */
+  private def getApplications(): Seq[ApplicationInfo] = {
+    getMasterState.activeApps
+  }
+
   /** Kill all executors belonging to this application. */
   private def killAllExecutors(sc: SparkContext): Boolean = {
     killNExecutors(sc, Int.MaxValue)
@@ -352,8 +428,11 @@ class StandaloneDynamicAllocationSuite
    * don't wait for executors to register. Otherwise the tests will take much longer to run.
    */
   private def getExecutorIds(sc: SparkContext): Seq[String] = {
-    assert(master.idToApp.contains(sc.applicationId))
-    master.idToApp(sc.applicationId).executors.keys.map(_.toString).toSeq
+    val app = getApplications().find(_.id == sc.applicationId)
+    assert(app.isDefined)
+    // Although executors is transient, master is in the same process so the message won't be
+    // serialized and it's safe here.
+    app.get.executors.keys.map(_.toString).toSeq
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org