You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/12/17 17:40:42 UTC

[spark] branch branch-3.1 updated: [SPARK-33173][CORE][TESTS][FOLLOWUP] Use `local[2]` and AtomicInteger

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new d5265ee  [SPARK-33173][CORE][TESTS][FOLLOWUP] Use `local[2]` and AtomicInteger
d5265ee is described below

commit d5265ee94995a7048667e4d64246a886eb29ade2
Author: yi.wu <yi...@databricks.com>
AuthorDate: Thu Dec 17 09:28:17 2020 -0800

    [SPARK-33173][CORE][TESTS][FOLLOWUP] Use `local[2]` and AtomicInteger
    
    ### What changes were proposed in this pull request?
    
    Use `local[2]` to let tasks launch at the same time. And change counters (`numOnTaskXXX`) to `AtomicInteger` type to ensure thread safe.
    
    ### Why are the changes needed?
    
    The test is still flaky after the fix https://github.com/apache/spark/pull/30072. See: https://github.com/apache/spark/pull/30728/checks?check_run_id=1557987642
    
    And it's easy to reproduce if you test it multiple times (e.g. 100) locally.
    
    The test sets up a stage with 2 tasks to run on an executor with 1 core. So these 2 tasks have to be launched one by one.
    The task-2 will be launched after task-1 fails. However, since we don't retry failed task in local mode  (MAX_LOCAL_TASK_FAILURES = 1), the stage will abort right away after task-1 fail and cancels the running task-2 at the same time. There's a chance that task-2 gets canceled before calling `PluginContainer.onTaskStart`, which leads to the test failure.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Tested manually after the fix and the test is no longer flaky.
    
    Closes #30823 from Ngone51/debug-flaky-spark-33088.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 15616f499aca93c98a71732add2a80de863d3d5f)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../internal/plugin/PluginContainerSuite.scala     | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index 15966e2..9ef81d3 100644
--- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.internal.plugin
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.util.{Map => JMap}
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -138,15 +139,15 @@ class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with Lo
     sc = new SparkContext(conf)
     sc.parallelize(1 to 10, 2).count()
 
-    assert(TestSparkPlugin.executorPlugin.numOnTaskStart == 2)
-    assert(TestSparkPlugin.executorPlugin.numOnTaskSucceeded == 2)
-    assert(TestSparkPlugin.executorPlugin.numOnTaskFailed == 0)
+    assert(TestSparkPlugin.executorPlugin.numOnTaskStart.get() == 2)
+    assert(TestSparkPlugin.executorPlugin.numOnTaskSucceeded.get() == 2)
+    assert(TestSparkPlugin.executorPlugin.numOnTaskFailed.get() == 0)
   }
 
   test("SPARK-33088: executor failed tasks trigger plugin calls") {
     val conf = new SparkConf()
       .setAppName(getClass().getName())
-      .set(SparkLauncher.SPARK_MASTER, "local[1]")
+      .set(SparkLauncher.SPARK_MASTER, "local[2]")
       .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName()))
 
     sc = new SparkContext(conf)
@@ -157,9 +158,9 @@ class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with Lo
     }
 
     eventually(timeout(10.seconds), interval(100.millis)) {
-      assert(TestSparkPlugin.executorPlugin.numOnTaskStart == 2)
-      assert(TestSparkPlugin.executorPlugin.numOnTaskSucceeded == 0)
-      assert(TestSparkPlugin.executorPlugin.numOnTaskFailed == 2)
+      assert(TestSparkPlugin.executorPlugin.numOnTaskStart.get() == 2)
+      assert(TestSparkPlugin.executorPlugin.numOnTaskSucceeded.get() == 0)
+      assert(TestSparkPlugin.executorPlugin.numOnTaskFailed.get() == 2)
     }
   }
 
@@ -343,9 +344,9 @@ private class TestDriverPlugin extends DriverPlugin {
 
 private class TestExecutorPlugin extends ExecutorPlugin {
 
-  var numOnTaskStart: Int = 0
-  var numOnTaskSucceeded: Int = 0
-  var numOnTaskFailed: Int = 0
+  val numOnTaskStart = new AtomicInteger(0)
+  val numOnTaskSucceeded = new AtomicInteger(0)
+  val numOnTaskFailed = new AtomicInteger(0)
 
   override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
     ctx.metricRegistry().register("executorMetric", new Gauge[Int] {
@@ -355,15 +356,15 @@ private class TestExecutorPlugin extends ExecutorPlugin {
   }
 
   override def onTaskStart(): Unit = {
-    numOnTaskStart += 1
+    numOnTaskStart.incrementAndGet()
   }
 
   override def onTaskSucceeded(): Unit = {
-    numOnTaskSucceeded += 1
+    numOnTaskSucceeded.incrementAndGet()
   }
 
   override def onTaskFailed(failureReason: TaskFailedReason): Unit = {
-    numOnTaskFailed += 1
+    numOnTaskFailed.incrementAndGet()
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org