You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rui Balau (Jira)" <ji...@apache.org> on 2021/04/29 17:26:00 UTC

[jira] [Commented] (SPARK-34731) ConcurrentModificationException in EventLoggingListener when redacting properties

    [ https://issues.apache.org/jira/browse/SPARK-34731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17335711#comment-17335711 ] 

Rui Balau commented on SPARK-34731:
-----------------------------------

I'd just like to add, while either 3.1.2. or 3.2.2 is not yet released, we have managed to work around this issue by disabling the Event Log as such:
{code:java}
spark.eventLog.enabled=false
{code}

> ConcurrentModificationException in EventLoggingListener when redacting properties
> ---------------------------------------------------------------------------------
>
>                 Key: SPARK-34731
>                 URL: https://issues.apache.org/jira/browse/SPARK-34731
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.1.1, 3.2.0
>            Reporter: Bruce Robbins
>            Assignee: Bruce Robbins
>            Priority: Major
>             Fix For: 3.1.2, 3.2.0
>
>
> Reproduction:
> The key elements of reproduction are enabling event logging, setting spark.executor.cores, and some bad luck:
> {noformat}
> $ bin/spark-shell --conf spark.ui.showConsoleProgress=false \
> --conf spark.executor.cores=1 --driver-memory 4g --conf \
> "spark.ui.showConsoleProgress=false" \
> --conf spark.eventLog.enabled=true \
> --conf spark.eventLog.dir=/tmp/spark-events
> ...
> scala> (0 to 500).foreach { i =>
>      |   val df = spark.range(0, 20000).toDF("a")
>      |   df.filter("a > 12").count
>      | }
> 21/03/12 18:16:44 ERROR AsyncEventQueue: Listener EventLoggingListener threw an exception
> java.util.ConcurrentModificationException
> 	at java.util.Hashtable$Enumerator.next(Hashtable.java:1387)
> 	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:424)
> 	at scala.collection.convert.Wrappers$JPropertiesWrapper$$anon$6.next(Wrappers.scala:420)
> 	at scala.collection.Iterator.foreach(Iterator.scala:941)
> 	at scala.collection.Iterator.foreach$(Iterator.scala:941)
> 	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
> 	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
> 	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
> 	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
> 	at scala.collection.mutable.MapLike.toSeq(MapLike.scala:75)
> 	at scala.collection.mutable.MapLike.toSeq$(MapLike.scala:72)
> 	at scala.collection.mutable.AbstractMap.toSeq(Map.scala:82)
> 	at org.apache.spark.scheduler.EventLoggingListener.redactProperties(EventLoggingListener.scala:290)
> 	at org.apache.spark.scheduler.EventLoggingListener.onJobStart(EventLoggingListener.scala:162)
> 	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent(SparkListenerBus.scala:37)
> 	at org.apache.spark.scheduler.SparkListenerBus.doPostEvent$(SparkListenerBus.scala:28)
> 	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> 	at org.apache.spark.scheduler.AsyncEventQueue.doPostEvent(AsyncEventQueue.scala:37)
> 	at org.apache.spark.util.ListenerBus.postToAll(ListenerBus.scala:117)
> 	at org.apache.spark.util.ListenerBus.postToAll$(ListenerBus.scala:101)
> 	at org.apache.spark.scheduler.AsyncEventQueue.super$postToAll(AsyncEventQueue.scala:105)
> 	at org.apache.spark.scheduler.AsyncEventQueue.$anonfun$dispatch$1(AsyncEventQueue.scala:105)
> 	at scala.runtime.java8.JFunction0$mcJ$sp.apply(JFunction0$mcJ$sp.java:23)
> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
> 	at org.apache.spark.scheduler.AsyncEventQueue.org$apache$spark$scheduler$AsyncEventQueue$$dispatch(AsyncEventQueue.scala:100)
> 	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.$anonfun$run$1(AsyncEventQueue.scala:96)
> 	at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1379)
> 	at org.apache.spark.scheduler.AsyncEventQueue$$anon$2.run(AsyncEventQueue.scala:96)
> {noformat}
> Analysis from quick reading of the code:
> DAGScheduler posts a JobSubmitted event containing a clone of a properties object [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L834].
> This event is handled [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L2394].
> DAGScheduler#handleJobSubmitted stores the properties object in a [Job object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1154], which in turn is [saved in the jobIdToActiveJob map|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1163].
> DAGScheduler#handleJobSubmitted posts a SparkListenerJobStart event [here|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1169] with a reference to the same properties object that was stored indirectly in the jobIdToActiveJob map.
> When the EventLoggerListener handles the SparkListenerJobStart event, it iterates over that properties object in redactProperties.
> Meanwhile, the DAGScheduler#handleJobSubmitted method is not yet done. It calls submitStage, which calls submitMissingTasks, which [retrieves the same properties object|https://github.com/apache/spark/blob/4f1e434ec57070b52b28f98c66b53ca6ec4de7a4/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1306] from jobIdToActiveJob and calls addPySparkConfigsToProperties, which will modify the properties if spark.executor.cores is set.
> If redactProperties just happens to still be iterating over the properties object when the modification happens, HashTable throws a ConcurrentModificationException.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org