You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Bruce Qiu <qi...@gmail.com> on 2018/08/18 03:07:03 UTC

Error while trigger checkpoint due to Kyro Exception

Hi Community,

I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka and write the parquet file to HDFS. In the previous environment, the Kafka had 192 partitions and I set the source parallelism to 192, the application works fine. But recently we had increased the Kafka paritions to 384. So I changed the source parallelism to 384. After I made this change, the application throws the exception as blow, and the checkpoint is always fail. Also I saw the backpressure is very high in the ColFlatMap stage. My application’s DAG as blow. Can someone helps me about this exception, thanks a lot.

 

DAG Stage

 

 

 

Exception stack trace:

 

java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom Source (257/384)

      at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)

      at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

      at java.util.concurrent.FutureTask.run(FutureTask.java:266)

      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

      at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator Source: Custom Source (257/384).

      at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)

      at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)

      at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)

      ... 5 more

Caused by: java.lang.Exception: Could not complete snapshot 109 for operator Source: Custom Source (257/384).

      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)

      at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)

      at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)

      at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)

      at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)

      at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)

      ... 7 more

Caused by: java.lang.ArrayIndexOutOfBoundsException: -1

      at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)

      at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)

    at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)

      at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)

      at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)

      at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)

      at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)

      at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)

      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:448)

      at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460)

      at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)

      at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363)

      ... 12 more

 

 

Regards,

Bruce


Re: Error while trigger checkpoint due to Kyro Exception

Posted by Stefan Richter <s....@data-artisans.com>.
Hi,

this problem is fixed in Flink >= 1.4.3, see https://issues.apache.org/jira/browse/FLINK-8836 <https://issues.apache.org/jira/browse/FLINK-8836>.

Best,
Stefan

> Am 18.08.2018 um 05:07 schrieb Bruce Qiu <qi...@gmail.com>:
> 
> Hi Community,
> I am using Flink 1.4.2 to do streaming processing. I fetch data from Kafka and write the parquet file to HDFS. In the previous environment, the Kafka had 192 partitions and I set the source parallelism to 192, the application works fine. But recently we had increased the Kafka paritions to 384. So I changed the source parallelism to 384. After I made this change, the application throws the exception as blow, and the checkpoint is always fail. Also I saw the backpressure is very high in the ColFlatMap stage. My application’s DAG as blow. Can someone helps me about this exception, thanks a lot.
>  
> DAG Stage
>  
> <image001.png>
>  
>  
> Exception stack trace:
>  
> java.lang.Exception: Error while triggering checkpoint 109 for Source: Custom Source (257/384)
>       at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1210)
>       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not perform checkpoint 109 for operator Source: Custom Source (257/384).
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:544)
>       at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.triggerCheckpoint(SourceStreamTask.java:111)
>       at org.apache.flink.runtime.taskmanager.Task$2.run(Task.java:1199)
>       ... 5 more
> Caused by: java.lang.Exception: Could not complete snapshot 109 for operator Source: Custom Source (257/384).
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:378)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1089)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1038)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:671)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:607)
>       at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpoint(StreamTask.java:538)
>       ... 7 more
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>       at com.esotericsoftware.kryo.util.IntArray.pop(IntArray.java:157)
>       at com.esotericsoftware.kryo.Kryo.reference(Kryo.java:822)
>     at com.esotericsoftware.kryo.serializers.FieldSerializer.copy(FieldSerializer.java:625)
>       at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>       at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:175)
>       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:104)
>       at org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:30)
>       at org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:74)
>       at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.<init>(DefaultOperatorStateBackend.java:448)
>       at org.apache.flink.runtime.state.DefaultOperatorStateBackend$PartitionableListState.deepCopy(DefaultOperatorStateBackend.java:460)
>       at org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:220)
>       at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:363)
>       ... 12 more
>  
>  
> Regards,
> Bruce
> <image001.png>