You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by "cmach@godaddy.com" <cm...@godaddy.com> on 2019/05/19 23:16:15 UTC

FlinkRunner CheckPoint Failed - Couldn't materialized/TypeSerialization

Hello Beam Dev,

I am having a hard time to get checkpoint work with FlinkRunner. I setup two simple pipelines, one read from unboundedsource Kinesis, and other read from text file (and test with/without `--streaming=true` config). But both are failed to save a checkpoint. The checkpoint are configured to save to file system. I am wonder if I am missing something?. Below are my pipelines and stack track for your references.

Appreciate if you can give me some pointers!

Pipeline #1:
pipeline.apply("ReadingFromKinesis123", KinesisIO.read()
  .withStreamName("test-stream")
  .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
    .withAWSClientsProvider("foo", "bar", Regions.US_WEST_2, "http://localhost:30002"));

Pipeline#2:

Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
p.run().waitUntilFinish();

One of my commands:

bin/flink run -c org.apache.beam.examples.WordCount ../../word-count-beam/target/word-count-beam-bundled-0.1.jar --runner=FlinkRunner --checkpointingInterval=5000 --externalizedCheckpointsEnabled=true --streamName=test-stream --retainExternalizedCheckpointsOnCancellation=true --awsRegion=us-west-2

flink-conf.yaml:

state.backend: filesystem

state.checkpoints.dir: file:///tmp/flink-checkpoint/flink_app/

state.savepoints.dir: file:///tmp/flink-checkpoint/flink_app/savepoints/

Stacktrack:

AsynchronousException{java.lang.Exception: Could not materialize checkpoint 7 for operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).}

       at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)

       at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)

       at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)

       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)

       at java.util.concurrent.FutureTask.run(FutureTask.java:266)

       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

       at java.lang.Thread.run(Thread.java:748)

Caused by: java.lang.Exception: Could not materialize checkpoint 7 for operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).

       at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)

       ... 6 more

Caused by: java.util.concurrent.ExecutionException: java.lang.AbstractMethodError: org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;

       at java.util.concurrent.FutureTask.report(FutureTask.java:122)

       at java.util.concurrent.FutureTask.get(FutureTask.java:192)

       at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)

       at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)

       at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)

       ... 5 more

Caused by: java.lang.AbstractMethodError: org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;

       at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)

       at org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)

       at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:123)

       at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)

       at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)

       at java.util.concurrent.FutureTask.run(FutureTask.java:266)

       at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)


Re: FlinkRunner CheckPoint Failed - Couldn't materialized/TypeSerialization

Posted by Maximilian Michels <mx...@apache.org>.
Hi,

Since you get AbstractMethodError, there is likely a version mismatch 
with Beam/Flink.

Could you please provide:

- Beam version used
- Flink Runner artifact used
- Flink version used

Thanks,
Max

On 20.05.19 01:16, cmach@godaddy.com wrote:
> Hello Beam Dev,
> 
> I am having a hard time to get checkpoint work with FlinkRunner. I setup 
> two simple pipelines, one read from unboundedsource Kinesis, and other 
> read from text file (and test with/without `--streaming=true` config). 
> But both are failed to save a checkpoint. The checkpoint are configured 
> to save to file system. I am wonder if I am missing something?. Below 
> are my pipelines and stack track for your references.
> 
> Appreciate if you can give me some pointers!
> 
> *Pipeline #1: *
> 
> pipeline.apply("ReadingFromKinesis123", KinesisIO./read/()
>    .withStreamName("test-stream")
>    .withInitialPositionInStream(InitialPositionInStream./TRIM_HORIZON/)
>      .withAWSClientsProvider("foo", "bar", Regions./US_WEST_2/, 
> "http://localhost:30002"));
> 
> *Pipeline#2:*
> 
> Pipeline p = Pipeline./create/(options);
> p.apply("ReadLines", TextIO./read/().from(options.getInputFile()));
> p.run().waitUntilFinish();
> 
> *One of my commands:*
> 
> bin/flink run -c org.apache.beam.examples.WordCount 
> ../../word-count-beam/target/word-count-beam-bundled-0.1.jar 
> --runner=FlinkRunner --checkpointingInterval=5000 
> --externalizedCheckpointsEnabled=true --streamName=test-stream 
> --retainExternalizedCheckpointsOnCancellation=true --awsRegion=us-west-2
> 
> *flink-conf.yaml:*
> 
> state.backend: filesystem
> 
> state.checkpoints.dir: file:///tmp/flink-checkpoint/flink_app/
> 
> state.savepoints.dir: file:///tmp/flink-checkpoint/flink_app/savepoints/
> 
> *Stacktrack:*
> 
> AsynchronousException{java.lang.Exception: Could not materialize 
> checkpoint 7 for operator Source: ReadLines/Read -> 
> DropInputs/ParMultiDo(NoOp) (1/1).}
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> 
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> 
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> 
> at java.lang.Thread.run(Thread.java:748)
> 
> Caused by: java.lang.Exception: Could not materialize checkpoint 7 for 
> operator Source: ReadLines/Read -> DropInputs/ParMultiDo(NoOp) (1/1).
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> 
> ... 6 more
> 
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.AbstractMethodError: 
> org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;
> 
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> 
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> 
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> 
> at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> 
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> 
> ... 5 more
> 
> Caused by: java.lang.AbstractMethodError: 
> org.apache.flink.api.common.typeutils.TypeSerializer.snapshotConfiguration()Lorg/apache/flink/api/common/typeutils/TypeSerializerSnapshot;
> 
> at 
> org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.computeSnapshot(RegisteredOperatorStateBackendMetaInfo.java:170)
> 
> at 
> org.apache.flink.runtime.state.RegisteredOperatorStateBackendMetaInfo.snapshot(RegisteredOperatorStateBackendMetaInfo.java:103)
> 
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:123)
> 
> at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> 
> at 
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> 
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> 
> at 
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>