You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rui Balau (Jira)" <ji...@apache.org> on 2021/04/29 17:26:00 UTC
[jira] [Commented] (SPARK-34731) ConcurrentModificationException in
EventLoggingListener when redacting properties
[ https://issues.apache.org/jira/browse/SPARK-34731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335711#comment-17335711 ]
Rui Balau commented on SPARK-34731:
-----------------------------------
I'd just like to add, while either 3.1.2. or 3.2.2 is not yet released, we have managed to work around this issue by disabling the Event Log as such:
{code:java}
spark.eventLog.enabled=false
{code}
> ConcurrentModificationException in EventLoggingListener when redacting properties
> ---------------------------------------------------------------------------------
>
> Key: SPARK-34731
> URL: https://issues.apache.org/jira/browse/SPARK-34731
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.1.1, 3.2.0
> Reporter: Bruce Robbins
> Assignee: Bruce Robbins
> Priority: Major
> Fix For: 3.1.2, 3.2.0
>
>
> Reproduction:
> The key elements of reproduction are enabling event logging, setting spark.executor.cores, and some bad luck:
> {noformat}
> $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
> --conf spark.executor.cores=1 --driver-memory 4g --conf \
> "spark.ui.showConsoleProgress=false" \
> --conf spark.eventLog.enabled=true \
> --conf spark.eventLog.dir=/tmp/spark-events
> ...
> scala> (0 to 500).foreach { i =>
> | val df = spark.range(0, 20000).toDF("a")
> | df.filter("a > 12").count
> | }
> 21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
> java.util.ConcurrentModificationException
> at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
> at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
> at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
> at scala.collection.Iterator.foreach(Iterator.scala:941)
> at scala.collection.Iterator.foreach$(Iterator.scala:941)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
> at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
> at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
> at org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
> at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
> at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
> at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
> at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
> at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
> at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
> at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
> at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
> at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
> at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
> at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
> at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
> {noformat}
> Analysis from quick reading of the code:
> DAGScheduler posts a JobSubmitted event containing a clone of a properties object [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834].
> This event is handled [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394].
> DAGScheduler#handleJobSubmitted stores the properties object in a [Job object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1154], which in turn is [saved in the jobIdToActiveJob map|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1163].
> DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1169] with a reference to the same properties object that was stored indirectly in the jobIdToActiveJob map.
> When the EventLoggerListener handles the SparkListenerJobStart event, it iterates over that properties object in redactProperties.
> Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It calls submitStage, which calls submitMissingTasks, which [retrieves the same properties object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1306] from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will modify the properties if spark.executor.cores is set.
> If redactProperties just happens to still be iterating over the properties object when the modification happens, HashTable throws a ConcurrentModificationException.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org