You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2021/11/13 18:45:48 UTC
[spark] branch master updated: [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 6426dcf [SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite
6426dcf is described below
commit 6426dcf230b6f69c93de9192f751a24dc0c5d750
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sat Nov 13 10:44:58 2021 -0800
[SPARK-37315][ML][TEST] Mitigate ConcurrentModificationException thrown from a test in MLEventSuite
### What changes were proposed in this pull request?
This PR is to mitigate `ConcurrentModificationException` sometimes thrown from a test.
Recently, I notice the exception is thrown from the following part of the test `pipeline read/write events` in `MLEventSuite` when Scala 2.13 is used.
```
events.map(JsonProtocol.sparkEventToJson).foreach { event =>
assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
}
```
We can also find this issue from the scheduled build.
https://github.com/apache/spark/runs/4196812399?check_suite_focus=true#step:9:17616
I think the root cause is the `ArrayBuffer` (`events`) is updated asynchronously by the following part.
```
private val listener: SparkListener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case e: MLEvent => events.append(e)
case _ =>
}
}
```
You can easily reproduce this issue by applying the following diff to the commit hash 4d29becee1ee8afa990f91684d84d717.
```
diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index f2343b7a88..ff63639e00 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
-42,7 +42,9 class MLEventsSuite
private val events = mutable.ArrayBuffer.empty[MLEvent]
private val listener: SparkListener = new SparkListener {
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
- case e: MLEvent => events.append(e)
+ case e: MLEvent =>
+ events.append(e)
+ Thread.sleep(500)
case _ =>
}
}
-235,11 +237,13 class MLEventsSuite
}
// Test if they can be ser/de via JSON protocol.
assert(events.nonEmpty)
- events.map(JsonProtocol.sparkEventToJson).foreach { event =>
- assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
- }
+ events.map { x =>
+ Thread.sleep(500)
+ JsonProtocol.sparkEventToJson(x)
+ }.foreach { event =>
+ assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ }
sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
-
events.clear()
val pipelineReader = Pipeline.read
assert(events.isEmpty)
```
This is a kind of race condition but I think we can mitigate by retrying.
Actually, I have never seen this issue when I used Scala 2.13.5 and recently we upgrade to 2.13.7.
Scala 2.13.7 includes an update to detect `ConcurrentModificationException` more precisely.
https://github.com/scala/scala/pull/9786
### Why are the changes needed?
For test stability.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
I manually modified the test code, inserting sleep like the diff shown above, and confirmed no ConcurrentModificationException is thrown.
Closes #34583 from sarutak/fix-concurrent-modifiation-exception-mlevent.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/ml/MLEventsSuite.scala | 32 ++++++++++++++--------
1 file changed, 20 insertions(+), 12 deletions(-)
diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index f2343b7..1226ad9 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -234,9 +234,11 @@ class MLEventsSuite
}
}
// Test if they can be ser/de via JSON protocol.
- assert(events.nonEmpty)
- events.map(JsonProtocol.sparkEventToJson).foreach { event =>
- assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ eventually(timeout(10.seconds), interval(1.second)) {
+ assert(events.nonEmpty)
+ events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+ assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ }
}
sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
@@ -260,9 +262,11 @@ class MLEventsSuite
}
}
// Test if they can be ser/de via JSON protocol.
- assert(events.nonEmpty)
- events.map(JsonProtocol.sparkEventToJson).foreach { event =>
- assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ eventually(timeout(10.seconds), interval(1.second)) {
+ assert(events.nonEmpty)
+ events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+ assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ }
}
}
}
@@ -293,9 +297,11 @@ class MLEventsSuite
}
}
// Test if they can be ser/de via JSON protocol.
- assert(events.nonEmpty)
- events.map(JsonProtocol.sparkEventToJson).foreach { event =>
- assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ eventually(timeout(10.seconds), interval(1.second)) {
+ assert(events.nonEmpty)
+ events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+ assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ }
}
sc.listenerBus.waitUntilEmpty(timeoutMillis = 10000)
@@ -319,9 +325,11 @@ class MLEventsSuite
}
}
// Test if they can be ser/de via JSON protocol.
- assert(events.nonEmpty)
- events.map(JsonProtocol.sparkEventToJson).foreach { event =>
- assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ eventually(timeout(10.seconds), interval(1.second)) {
+ assert(events.nonEmpty)
+ events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+ assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org