You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yukun Guo <gy...@gmail.com> on 2016/10/14 06:32:49 UTC

ConcurrentModificationException when using histogram accumulators

This happens when the TaskManager is serializing an
org.apache.flink.api.common.accumulators.Histogram by iterating through the
underlying TreeMap while a MapFunction for updating the accumulator
attempts to modify the TreeMap concurrently. How could I fix it?


The call stack:

WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     -
Failed to serialize accumulators for task.
java.util.ConcurrentModificationException
        at
java.util.TreeMap$PrivateEntryIterator.nextEntry(TreeMap.java:1211)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
        at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
        at java.util.TreeMap.writeObject(TreeMap.java:2436)
        at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
        at java.util.HashMap.writeObject(HashMap.java:1362)
        at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
        at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
        at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
        at
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
        at
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
        at
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
        at
org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:52)
        at
org.apache.flink.runtime.accumulators.AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
        at
org.apache.flink.runtime.accumulators.AccumulatorRegistry.getSnapshot(AccumulatorRegistry.java:75)
        at
org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
        ...

Re: ConcurrentModificationException when using histogram accumulators

Posted by Till Rohrmann <tr...@apache.org>.
Hi Yukun,

I think you've found a bug in the code. The accumulators don't seem to be
really thread safe. I've created an issue to fix this issue [1]. Thanks for
reporting the problem :-)

[1] https://issues.apache.org/jira/browse/FLINK-4829

Cheers,
Till

On Fri, Oct 14, 2016 at 8:32 AM, Yukun Guo <gy...@gmail.com> wrote:

> This happens when the TaskManager is serializing an
> org.apache.flink.api.common.accumulators.Histogram by iterating through
> the underlying TreeMap while a MapFunction for updating the accumulator
> attempts to modify the TreeMap concurrently. How could I fix it?
>
>
> The call stack:
>
> WARN  org.apache.flink.runtime.accumulators.AccumulatorRegistry     -
> Failed to serialize accumulators for task.
> java.util.ConcurrentModificationException
>         at java.util.TreeMap$PrivateEntryIterator.
> nextEntry(TreeMap.java:1211)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1247)
>         at java.util.TreeMap$EntryIterator.next(TreeMap.java:1242)
>         at java.util.TreeMap.writeObject(TreeMap.java:2436)
>         at sun.reflect.GeneratedMethodAccessor491.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.defaultWriteFields(
> ObjectOutputStream.java:1548)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1509)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at java.util.HashMap.internalWriteEntries(HashMap.java:1785)
>         at java.util.HashMap.writeObject(HashMap.java:1362)
>         at sun.reflect.GeneratedMethodAccessor189.invoke(Unknown Source)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(
> DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:498)
>         at java.io.ObjectStreamClass.invokeWriteObject(
> ObjectStreamClass.java:1028)
>         at java.io.ObjectOutputStream.writeSerialData(
> ObjectOutputStream.java:1496)
>         at java.io.ObjectOutputStream.writeOrdinaryObject(
> ObjectOutputStream.java:1432)
>         at java.io.ObjectOutputStream.writeObject0(
> ObjectOutputStream.java:1178)
>         at java.io.ObjectOutputStream.writeObject(
> ObjectOutputStream.java:348)
>         at org.apache.flink.util.InstantiationUtil.serializeObject(
> InstantiationUtil.java:301)
>         at org.apache.flink.util.SerializedValue.<init>(
> SerializedValue.java:52)
>         at org.apache.flink.runtime.accumulators.
> AccumulatorSnapshot.<init>(AccumulatorSnapshot.java:58)
>         at org.apache.flink.runtime.accumulators.AccumulatorRegistry.
> getSnapshot(AccumulatorRegistry.java:75)
>         at org.apache.flink.runtime.taskmanager.TaskManager$$anonfun$
> sendHeartbeatToJobManager$2.apply(TaskManager.scala:1286)
>         ...
>