You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/11/13 18:45:48 UTC

[spark] branch master updated: [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6426dcf  [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite
6426dcf is described below

commit 6426dcf230b6f69c93de9192f751a24dc0c5d750
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sat Nov 13 10:44:58 2021 -0800

    [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite
    
    ### What changes were proposed in this pull request?
    
    This PR is to mitigate `ConcurrentModificationException` sometimes thrown from a test.
    Recently, I notice the exception is thrown from the following part of the test `pipeline read/write events` in `MLEventSuite` when Scala 2.13 is used.
    ```
    events.map(JsonProtocol.sparkEventToJson).foreach { event =>
      assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
    }
    ```
    
    We can also find this issue from the scheduled build.
    https://github.com/apache/spark/runs/4196812399?check_suite_focus=true#step:9:17616
    
    I think the root cause is the `ArrayBuffer` (`events`) is updated asynchronously by the following part.
    ```
    private val listener: SparkListener = new SparkListener {
      override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
        case e: MLEvent => events.append(e)
        case _ =>
      }
    }
    ```
    You can easily reproduce this issue by applying the following diff to the commit hash 4d29becee1ee8afa990f91684d84d717.
    ```
    diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
    index f2343b7a88..ff63639e00 100644
    --- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
    +++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
     -42,7 +42,9  class MLEventsSuite
       private val events = mutable.ArrayBuffer.empty[MLEvent]
       private val listener: SparkListener = new SparkListener {
         override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
    -      case e: MLEvent => events.append(e)
    +      case e: MLEvent =>
    +        events.append(e)
    +        Thread.sleep(500)
           case _ =>
         }
       }
     -235,11 +237,13  class MLEventsSuite
           }
           // Test if they can be ser/de via JSON protocol.
           assert(events.nonEmpty)
    -      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
    -        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
    -      }
    +        events.map { x =>
    +          Thread.sleep(500)
    +          JsonProtocol.sparkEventToJson(x)
    +        }.foreach { event =>
    +          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
    +        }
           sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
    -
           events.clear()
           val pipelineReader = Pipeline.read
           assert(events.isEmpty)
    ```
    
    This is a kind of race condition but I think we can mitigate by retrying.
    
    Actually, I have never seen this issue when I used Scala 2.13.5 and recently we upgrade to 2.13.7.
    Scala 2.13.7 includes an update to detect `ConcurrentModificationException` more precisely.
    https://github.com/scala/scala/pull/9786
    
    ### Why are the changes needed?
    
    For test stability.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I manually modified the test code, inserting sleep like the diff shown above, and confirmed no ConcurrentModificationException is thrown.
    
    Closes #34583 from sarutak/fix-concurrent-modifiation-exception-mlevent.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/ml/MLEventsSuite.scala  | 32 ++++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)

diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index f2343b7..1226ad9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -234,9 +234,11 @@ class MLEventsSuite
         }
       }
       // Test if they can be ser/de via JSON protocol.
-      assert(events.nonEmpty)
-      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
-        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+      eventually(timeout(10.seconds), interval(1.second)) {
+        assert(events.nonEmpty)
+        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+        }
       }
       sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
 
@@ -260,9 +262,11 @@ class MLEventsSuite
         }
       }
       // Test if they can be ser/de via JSON protocol.
-      assert(events.nonEmpty)
-      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
-        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+      eventually(timeout(10.seconds), interval(1.second)) {
+        assert(events.nonEmpty)
+        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+        }
       }
     }
   }
@@ -293,9 +297,11 @@ class MLEventsSuite
         }
       }
       // Test if they can be ser/de via JSON protocol.
-      assert(events.nonEmpty)
-      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
-        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+      eventually(timeout(10.seconds), interval(1.second)) {
+        assert(events.nonEmpty)
+        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+        }
       }
       sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
 
@@ -319,9 +325,11 @@ class MLEventsSuite
         }
       }
       // Test if they can be ser/de via JSON protocol.
-      assert(events.nonEmpty)
-      events.map(JsonProtocol.sparkEventToJson).foreach { event =>
-        assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+      eventually(timeout(10.seconds), interval(1.second)) {
+        assert(events.nonEmpty)
+        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+          assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+        }
       }
     }
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org