You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by PedroMrChaves <pe...@gmail.com> on 2018/03/20 14:02:09 UTC

Error while reporting metrics - ConcorrentModificationException

Hello,

I have the following error while trying to report metrics to influxdb using
the DropwizardReporter.

2018-03-20 13:51:00,288 WARN 
org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while
reporting metrics
java.util.ConcurrentModificationException
	at
java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
	at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
	at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
	at java.util.HashSet.<init>(HashSet.java:120)
	at
org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
	at
org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
	at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
	at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
	at
org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
	at
org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
	at
metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
	at
metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
	at
org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
	at
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Any ideas on what might be the problem?




-----
Best Regards,
Pedro Chaves
--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Re: Error while reporting metrics - ConcorrentModificationException

Posted by Chesnay Schepler <ch...@apache.org>.
A wrapped Kafka metric was accessing state of the consumer while said 
state was modified.

As far as I can tell this is a Kafka issue and there's nothing we can do.

Unless this happens frequently it should be safe to ignore it.

On 20.03.2018 15:02, PedroMrChaves wrote:
> Hello,
>
> I have the following error while trying to report metrics to influxdb using
> the DropwizardReporter.
>
> 2018-03-20 13:51:00,288 WARN
> org.apache.flink.runtime.metrics.MetricRegistryImpl           - Error while
> reporting metrics
> java.util.ConcurrentModificationException
> 	at
> java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
> 	at java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:742)
> 	at java.util.AbstractCollection.addAll(AbstractCollection.java:343)
> 	at java.util.HashSet.<init>(HashSet.java:120)
> 	at
> org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
> 	at
> org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
> 	at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:906)
> 	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:61)
> 	at org.apache.kafka.common.metrics.KafkaMetric.value(KafkaMetric.java:52)
> 	at
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:35)
> 	at
> org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper.getValue(KafkaMetricWrapper.java:26)
> 	at
> org.apache.flink.dropwizard.metrics.FlinkGaugeWrapper.getValue(FlinkGaugeWrapper.java:36)
> 	at
> metrics_influxdb.measurements.MeasurementReporter.fromGauge(MeasurementReporter.java:163)
> 	at
> metrics_influxdb.measurements.MeasurementReporter.report(MeasurementReporter.java:55)
> 	at
> org.apache.flink.dropwizard.ScheduledDropwizardReporter.report(ScheduledDropwizardReporter.java:231)
> 	at
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:417)
> 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
> 	at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
> 	at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
> 	at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 	at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 	at java.lang.Thread.run(Thread.java:748)
>
> Any ideas on what might be the problem?
>
>
>
>
> -----
> Best Regards,
> Pedro Chaves
> --
> Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>