You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Chesnay Schepler (JIRA)" <ji...@apache.org> on 2018/04/25 21:11:00 UTC

[jira] [Commented] (FLINK-9258) ConcurrentModificationException in ComponentMetricGroup.getAllVariables

    [ https://issues.apache.org/jira/browse/FLINK-9258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16453095#comment-16453095 ] 

Chesnay Schepler commented on FLINK-9258:
-----------------------------------------

This should only be possible if {{getAllVariables}} is called concurrently by multiple threads. The problem is that the internal map is assigned to the field before it is populated with data, at which point it is already accessable by other threads.

> ConcurrentModificationException in ComponentMetricGroup.getAllVariables
> -----------------------------------------------------------------------
>
>                 Key: FLINK-9258
>                 URL: https://issues.apache.org/jira/browse/FLINK-9258
>             Project: Flink
>          Issue Type: Bug
>          Components: Metrics
>    Affects Versions: 1.4.2
>            Reporter: Narayanan Arunachalam
>            Assignee: Chesnay Schepler
>            Priority: Major
>             Fix For: 1.5.0, 1.4.3
>
>
> Seeing this exception at the job startup time. Looks like there is a race condition when the metrics variables are constructed.
> The error is intermittent.
> Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:897)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:840)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>         at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>         at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>         at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>         at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.util.ConcurrentModificationException
>         at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>         at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>         at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>         at java.util.HashMap.putMapEntries(HashMap.java:511)
>         at java.util.HashMap.putAll(HashMap.java:784)
>         at org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>         at org.apache.flink.runtime.metrics.groups.ComponentMetricGroup.getAllVariables(ComponentMetricGroup.java:63)
>         at com.netflix.spaas.metrics.MetricsReporterRegistry.getTags(MetricsReporterRegistry.java:147)
>         at com.netflix.spaas.metrics.MetricsReporterRegistry.mergeWithSourceAndSinkTags(MetricsReporterRegistry.java:170)
>         at com.netflix.spaas.metrics.MetricsReporterRegistry.addReporter(MetricsReporterRegistry.java:75)
>         at com.netflix.spaas.nfflink.connector.kafka.source.Kafka010Consumer.createFetcher(Kafka010Consumer.java:69)
>         at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:549)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:86)
>         at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55)
>         at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:94)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>         at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)