You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by James Yu <cy...@gmail.com> on 2018/03/29 21:09:43 UTC

Anyway to read Cassandra as DataStream/DataSet in Flink?

Hi,

I tried to treat Cassandra as the source of data in Flink with the
information provided in the following links:
-
https://stackoverflow.com/questions/43067681/read-data-from-cassandra-for-processing-in-flink
-
https://www.javatips.net/api/flink-master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/async/AsyncIOExample.java

I got the AsyncWaitOperator exception when I run the task. According the
the 1st link, this exception occurs due to network problem. However, the
strange thing is that I am running Cassandra on my local VM with only 10
rows of data in the target table.

@Jicaar in 1st link also mentions that switching from RichAsyncFunction to
RichMapFunction can avoid the AsyncWaitOperator exception, can someone with
similar experience share how to do it in RichMapFunction?

AsyncWaitOperator exception trace -->
02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source -> async
wait operator -> (Flat Map, Sink: Unnamed) (1/1)
(2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
java.lang.Exception: An async function call terminated with an exception.
Failing the AsyncWaitOperator.
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
Caused by:
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
Could not forward element to next operator
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:611)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:572)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:830)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:808)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 2 common frames omitted
Caused by: com.esotericsoftware.kryo.KryoException:
java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc
(org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader)
contextClassLoader (java.lang.Thread)
threads (java.lang.ThreadGroup)
groups (java.lang.ThreadGroup)
threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
val$backingThreadFactory
(com.google.common.util.concurrent.ThreadFactoryBuilder$1)
threadFactory (java.util.concurrent.ThreadPoolExecutor)
delegate
(com.google.common.util.concurrent.MoreExecutors$ListeningDecorator)
blockingExecutor (com.datastax.driver.core.Cluster$Manager)
manager (com.datastax.driver.core.Host)
triedHosts (com.datastax.driver.core.ExecutionInfo)
info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:348)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:289)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:495)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
~[kryo-2.24.0.jar:na]
  at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182)
~[flink-core-1.4.2.jar:1.4.2]
  at
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
  ... 10 common frames omitted
Caused by: java.util.ConcurrentModificationException: null
  at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
~[na:1.8.0_60]
  at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
~[kryo-2.24.0.jar:na]
  at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
~[kryo-2.24.0.jar:na]
  at
com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
~[kryo-2.24.0.jar:na]
  ... 68 common frames omitted



This is a UTF-8 formatted mail
-----------------------------------------------
James C.-C.Yu
+886988713275

Re: Anyway to read Cassandra as DataStream/DataSet in Flink?

Posted by Fabian Hueske <fh...@gmail.com>.
Hi James,

The answer to your question depends on your use case.
The AsyncIOFunction approach works if you have a DataStream that you would
like to enrich with data in a Cassandra table but not if you would like to
create a DataStream from a Cassandra table.

The Flink code base contains a CassandraInputFormat [1], but I don't think
it is extensively used. You might need to adjust it to your needs.
InputFormats are the source connectors for the DataSet API but can also be
used to from the DataStream API to read bounded data. However, InputFormats
do not guarantee the order of emitted records. Hence, they are not well
suited for time-based applications.

Best, Fabian

[1]
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraInputFormat.java


2018-03-29 23:09 GMT+02:00 James Yu <cy...@gmail.com>:

> Hi,
>
> I tried to treat Cassandra as the source of data in Flink with the
> information provided in the following links:
> - https://stackoverflow.com/questions/43067681/read-data-
> from-cassandra-for-processing-in-flink
> - https://www.javatips.net/api/flink-master/flink-examples/
> flink-examples-streaming/src/main/java/org/apache/flink/
> streaming/examples/async/AsyncIOExample.java
>
> I got the AsyncWaitOperator exception when I run the task. According the
> the 1st link, this exception occurs due to network problem. However, the
> strange thing is that I am running Cassandra on my local VM with only 10
> rows of data in the target table.
>
> @Jicaar in 1st link also mentions that switching from RichAsyncFunction to
> RichMapFunction can avoid the AsyncWaitOperator exception, can someone
> with similar experience share how to do it in RichMapFunction?
>
> AsyncWaitOperator exception trace -->
> 02:21:00.164 [AsyncIO-Emitter-Thread (Source: Custom Source -> async wait
> operator -> (Flat Map, Sink: Unnamed) (1/1))] INFO
> org.apache.flink.runtime.taskmanager.Task  - Source: Custom Source ->
> async wait operator -> (Flat Map, Sink: Unnamed) (1/1) (
> 2809cef511194e612b2cc65510f78c64) switched from RUNNING to FAILED.
> java.lang.Exception: An async function call terminated with an exception.
> Failing the AsyncWaitOperator.
>   at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:137)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:85)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
> Caused by: org.apache.flink.streaming.runtime.tasks.
> ExceptionInChainedOperatorException: Could not forward element to next
> operator
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:566)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:524)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.collect(OperatorChain.java:504)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:611)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> BroadcastingOutputCollector.collect(OperatorChain.java:572)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:830)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.AbstractStreamOperator$
> CountingOutput.collect(AbstractStreamOperator.java:808)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.
> TimestampedCollector.collect(TimestampedCollector.java:51)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.api.operators.async.Emitter.output(Emitter.java:133)
> [flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   ... 2 common frames omitted
> Caused by: com.esotericsoftware.kryo.KryoException: java.util.
> ConcurrentModificationException
> Serialization trace:
> classes (sun.misc.Launcher$AppClassLoader)
> classloader (java.security.ProtectionDomain)
> context (java.security.AccessControlContext)
> acc (org.apache.flink.runtime.execution.librarycache.
> FlinkUserCodeClassLoaders$ChildFirstClassLoader)
> contextClassLoader (java.lang.Thread)
> threads (java.lang.ThreadGroup)
> groups (java.lang.ThreadGroup)
> threadGroup (io.netty.util.concurrent.DefaultThreadFactory)
> val$backingThreadFactory (com.google.common.util.concurrent.
> ThreadFactoryBuilder$1)
> threadFactory (java.util.concurrent.ThreadPoolExecutor)
> delegate (com.google.common.util.concurrent.MoreExecutors$
> ListeningDecorator)
> blockingExecutor (com.datastax.driver.core.Cluster$Manager)
> manager (com.datastax.driver.core.Host)
> triedHosts (com.datastax.driver.core.ExecutionInfo)
> info (com.datastax.driver.core.ArrayBackedResultSet$SinglePage)
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:82)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:577)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:68)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:348)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$
> ObjectArraySerializer.write(DefaultArraySerializers.java:289)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:599)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:82)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.FieldSerializer.
> write(FieldSerializer.java:495) ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:505)
> ~[kryo-2.24.0.jar:na]
>   at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:182)
> ~[flink-core-1.4.2.jar:1.4.2]
>   at org.apache.flink.streaming.runtime.tasks.OperatorChain$
> CopyingChainingOutput.pushToOperator(OperatorChain.java:547)
> ~[flink-streaming-java_2.11-1.4.2.jar:1.4.2]
>   ... 10 common frames omitted
> Caused by: java.util.ConcurrentModificationException: null
>   at java.util.Vector$Itr.checkForComodification(Vector.java:1184)
> ~[na:1.8.0_60]
>   at java.util.Vector$Itr.next(Vector.java:1137) ~[na:1.8.0_60]
>   at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:22)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:523)
> ~[kryo-2.24.0.jar:na]
>   at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:61)
> ~[kryo-2.24.0.jar:na]
>   ... 68 common frames omitted
>
>
>
> This is a UTF-8 formatted mail
> -----------------------------------------------
> James C.-C.Yu
> +886988713275
>