You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/25 23:23:19 UTC

[spark] branch master updated: [SPARK-32100][CORE][TESTS] Add WorkerDecommissionExtendedSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 594cb56  [SPARK-32100][CORE][TESTS] Add WorkerDecommissionExtendedSuite
594cb56 is described below

commit 594cb560750db1a9844386178027f9b28069525a
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Thu Jun 25 16:21:14 2020 -0700

    [SPARK-32100][CORE][TESTS] Add WorkerDecommissionExtendedSuite
    
    ### What changes were proposed in this pull request?
    
    This PR aims to add `WorkerDecomissionExtendedSuite` for various worker decommission combinations.
    
    ### Why are the changes needed?
    
    This will improve the test coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass the Jenkins.
    
    Closes #28929 from dongjoon-hyun/SPARK-WD-TEST.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../WorkerDecommissionExtendedSuite.scala          | 73 ++++++++++++++++++++++
 1 file changed, 73 insertions(+)

diff --git a/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
new file mode 100644
index 0000000..02c72fa
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/WorkerDecommissionExtendedSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import scala.concurrent.duration._
+
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
+import org.apache.spark.LocalSparkContext.withSpark
+import org.apache.spark.internal.config.{DYN_ALLOCATION_ENABLED, DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, DYN_ALLOCATION_INITIAL_EXECUTORS, DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED}
+import org.apache.spark.internal.config.Worker.WORKER_DECOMMISSION_ENABLED
+import org.apache.spark.launcher.SparkLauncher.{EXECUTOR_MEMORY, SPARK_MASTER}
+import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
+
+/** This test suite aims to test worker decommission with various configurations. */
+class WorkerDecommissionExtendedSuite extends SparkFunSuite with LocalSparkContext {
+  private val conf = new org.apache.spark.SparkConf()
+    .setAppName(getClass.getName)
+    .set(SPARK_MASTER, "local-cluster[20,1,512]")
+    .set(EXECUTOR_MEMORY, "512m")
+    .set(DYN_ALLOCATION_ENABLED, true)
+    .set(DYN_ALLOCATION_SHUFFLE_TRACKING_ENABLED, true)
+    .set(DYN_ALLOCATION_INITIAL_EXECUTORS, 20)
+    .set(WORKER_DECOMMISSION_ENABLED, true)
+
+  test("Worker decommission and executor idle timeout") {
+    sc = new SparkContext(conf.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "10s"))
+    withSpark(sc) { sc =>
+      TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
+      val rdd1 = sc.parallelize(1 to 10, 2)
+      val rdd2 = rdd1.map(x => (1, x))
+      val rdd3 = rdd2.reduceByKey(_ + _)
+      val rdd4 = rdd3.sortByKey()
+      assert(rdd4.count() === 1)
+      eventually(timeout(20.seconds), interval(1.seconds)) {
+        assert(sc.getExecutorIds().length < 5)
+      }
+    }
+  }
+
+  test("Decommission 19 executors from 20 executors in total") {
+    sc = new SparkContext(conf)
+    withSpark(sc) { sc =>
+      TestUtils.waitUntilExecutorsUp(sc, 20, 60000)
+      val rdd1 = sc.parallelize(1 to 100000, 200)
+      val rdd2 = rdd1.map(x => (x % 100, x))
+      val rdd3 = rdd2.reduceByKey(_ + _)
+      assert(rdd3.count() === 100)
+
+      val sched = sc.schedulerBackend.asInstanceOf[StandaloneSchedulerBackend]
+      sc.getExecutorIds().tail.foreach { id =>
+        sched.decommissionExecutor(id)
+        assert(rdd3.sortByKey().collect().length === 100)
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org