You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Morávek <dm...@apache.org> on 2020/04/21 07:29:23 UTC

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Hi Stephen,

nice catch and awesome report! ;) This definitely needs a proper fix. I've
created a new JIRA to track the issue and will try to resolve it soon as
this seems critical to me.

https://issues.apache.org/jira/browse/BEAM-9794

Thanks,
D.

On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <st...@gmail.com>
wrote:

> I was able to reproduce this in a unit test:
>
> @Test
>>
>>   *public* *void* test() *throws* InterruptedException,
>> ExecutionException {
>>
>>     FlinkPipelineOptions options = PipelineOptionsFactory.*as*
>> (FlinkPipelineOptions.*class*);
>>
>>     options.setCheckpointingInterval(10L);
>>
>>     options.setParallelism(1);
>>
>>     options.setStreaming(*true*);
>>
>>     options.setRunner(FlinkRunner.*class*);
>>
>>     options.setFlinkMaster("[local]");
>>
>>     options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
>> ));
>>
>>     Pipeline pipeline = Pipeline.*create*(options);
>>
>>     pipeline
>>
>>         .apply(Create.*of*((Void) *null*))
>>
>>         .apply(
>>
>>             ParDo.*of*(
>>
>>                 *new* DoFn<Void, Void>() {
>>
>>
>>                   *private* *static* *final* *long* *serialVersionUID* =
>> 1L;
>>
>>
>>                   @RequiresStableInput
>>
>>                   @ProcessElement
>>
>>                   *public* *void* processElement() {}
>>
>>                 }));
>>
>>     pipeline.run();
>>
>>   }
>>
>
> It took a while to get to checkpoint 32,767, but eventually it did, and it
> failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel <st...@gmail.com>
> wrote:
>
>> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
>> uses the RequiresStableInput feature.
>>
>> Currently it's configured to checkpoint once a minute, and after around
>> 32000-33000 checkpoints, it fails with:
>>
>>> 2020-04-15 13:15:02,920 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator <operator_name> (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator <operator_name> (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>>> ... 7 more
>>
>>
>> The exception comes from here:
>> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>>
>> In the Flink Runner code, I can see that each checkpoint will result in a
>> new OperatorState (or KeyedState if the stream is keyed):
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>>
>> This seems to be the reason the pipeline will eventually die.
>>
>> While a workaround might be to increase the time between checkpoints, it
>> seems like any pipeline running on flink, using the RequiresStableInput is
>> limited in the amount of time that it can run without being started from
>> scratch.
>>
>>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks for the info!

Eleanore

On Tue, May 5, 2020 at 4:01 AM Maximilian Michels <mx...@apache.org> wrote:

> Hey Eleanore,
>
> The change will be part of the 2.21.0 release.
>
> -Max
>
> On 04.05.20 19:14, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the information and I saw this PR is already merged, just
> > wonder is it backported to the affected versions already
> > (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> > to wait for the 2.20.1 release?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Eleanore,
> >
> >     Exactly-once is not affected but the pipeline can fail to checkpoint
> >     after the maximum number of state cells have been reached. We are
> >     working on a fix [1].
> >
> >     Cheers,
> >     Max
> >
> >     [1] https://github.com/apache/beam/pull/11478
> >
> >     On 22.04.20 07:19, Eleanore Jin wrote:
> >     > Hi Maxi,
> >     >
> >     > I assume this will impact the Exactly Once Semantics that beam
> >     provided
> >     > as in the KafkaExactlyOnceSink, the processElement method is also
> >     > annotated with @RequiresStableInput?
> >     >
> >     > Thanks a lot!
> >     > Eleanore
> >     >
> >     > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>
> >     > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >     >
> >     >     Hi Stephen,
> >     >
> >     >     Thanks for reporting the issue! David, good catch!
> >     >
> >     >     I think we have to resort to only using a single state cell for
> >     >     buffering on checkpoints, instead of using a new one for every
> >     >     checkpoint. I was under the assumption that, if the state cell
> was
> >     >     cleared, it would not be checkpointed but that does not seem
> to be
> >     >     the case.
> >     >
> >     >     Thanks,
> >     >     Max
> >     >
> >     >     On 21.04.20 09:29, David Morávek wrote:
> >     >     > Hi Stephen,
> >     >     >
> >     >     > nice catch and awesome report! ;) This definitely needs a
> >     proper fix.
> >     >     > I've created a new JIRA to track the issue and will try to
> >     resolve it
> >     >     > soon as this seems critical to me.
> >     >     >
> >     >     > https://issues.apache.org/jira/browse/BEAM-9794
> >     >     >
> >     >     > Thanks,
> >     >     > D.
> >     >     >
> >     >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> >     >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> >     >     > <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>
> >     >     <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>>> wrote:
> >     >     >
> >     >     >     I was able to reproduce this in a unit test:
> >     >     >
> >     >     >         @Test
> >     >     >
> >     >     >           *public* *void* test() *throws*
> InterruptedException,
> >     >     >         ExecutionException {
> >     >     >
> >     >     >             FlinkPipelineOptions options =
> >     >     >
> >      PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >     >     >
> >     >     >             options.setCheckpointingInterval(10L);
> >     >     >
> >     >     >             options.setParallelism(1);
> >     >     >
> >     >     >             options.setStreaming(*true*);
> >     >     >
> >     >     >             options.setRunner(FlinkRunner.*class*);
> >     >     >
> >     >     >             options.setFlinkMaster("[local]");
> >     >     >
> >     >     >             options.setStateBackend(*new*
> >     >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >     >     >
> >     >     >             Pipeline pipeline = Pipeline./create/(options);
> >     >     >
> >     >     >             pipeline
> >     >     >
> >     >     >                 .apply(Create./of/((Void) *null*))
> >     >     >
> >     >     >                 .apply(
> >     >     >
> >     >     >                     ParDo./of/(
> >     >     >
> >     >     >                         *new* DoFn<Void, Void>() {
> >     >     >
> >     >     >
> >     >     >                           *private* *static* *final* *long*
> >     >     >         */serialVersionUID/* = 1L;
> >     >     >
> >     >     >
> >     >     >                           @RequiresStableInput
> >     >     >
> >     >     >                           @ProcessElement
> >     >     >
> >     >     >                           *public* *void* processElement() {}
> >     >     >
> >     >     >                         }));
> >     >     >
> >     >     >             pipeline.run();
> >     >     >
> >     >     >           }
> >     >     >
> >     >     >
> >     >     >     It took a while to get to checkpoint 32,767, but
> >     eventually it
> >     >     did,
> >     >     >     and it failed with the same error I listed above.
> >     >     >
> >     >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     >     >     <stephenpatel89@gmail.com
> >     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>
> >     >     <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>>>
> >     >     wrote:
> >     >     >
> >     >     >         I have a Beam Pipeline (2.14) running on Flink
> (1.8.0,
> >     >     >         emr-5.26.0) that uses the RequiresStableInput
> feature.
> >     >     >
> >     >     >         Currently it's configured to checkpoint once a
> >     minute, and
> >     >     after
> >     >     >         around 32000-33000 checkpoints, it fails with:
> >     >     >
> >     >     >             2020-04-15 13:15:02,920 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Triggering checkpoint 32701 @ 1586956502911
> >     for job
> >     >     >             9953424f21e240112dd23ab4f8320b60.
> >     >     >             2020-04-15 13:15:05,762 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Completed checkpoint 32701 for job
> >     >     >             9953424f21e240112dd23ab4f8320b60 (795385496
> bytes in
> >     >     2667 ms).
> >     >     >             2020-04-15 13:16:02,919 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Triggering checkpoint 32702 @ 1586956562911
> >     for job
> >     >     >             9953424f21e240112dd23ab4f8320b60.
> >     >     >             2020-04-15 13:16:03,147 INFO
> >     >     >
> >     >       org.apache.flink.runtime.executiongraph.ExecutionGraph
> >     >     >                - <operator_name> (1/2)
> >     >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
> >     >     RUNNING to
> >     >     >             FAILED.
> >     >     >             AsynchronousException{java.lang.Exception: Could
> not
> >     >     >             materialize checkpoint 32702 for operator
> >     <operator_name>
> >     >     >             (1/2).}
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     >     >             at java.lang.Thread.run(Thread.java:748)
> >     >     >             Caused by: java.lang.Exception: Could not
> >     materialize
> >     >     >             checkpoint 32702 for operator <operator_name>
> (1/2).
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >     >     >             ... 6 more
> >     >     >             Caused by:
> java.util.concurrent.ExecutionException:
> >     >     >             java.lang.IllegalArgumentException
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >     >     >             ... 5 more
> >     >     >             Caused by: java.lang.IllegalArgumentException
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >     >     >             ... 7 more
> >     >     >
> >     >     >
> >     >     >         The exception comes from
> >     >     >
> >     >
> >       here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >     >     >
> >     >     >         In the Flink Runner code, I can see that each
> >     checkpoint will
> >     >     >         result in a new OperatorState (or KeyedState if the
> >     stream is
> >     >     >         keyed):
> >     >     >
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >     >     >
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >     >     >
> >     >     >         This seems to be the reason the pipeline will
> eventually
> >     >     die.
> >     >     >
> >     >     >         While a workaround might be to increase the time
> between
> >     >     >         checkpoints, it seems like any pipeline running on
> >     flink,
> >     >     using
> >     >     >         the RequiresStableInput is limited in the amount of
> time
> >     >     that it
> >     >     >         can run without being started from scratch.
> >     >     >
> >     >
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks for the info!

Eleanore

On Tue, May 5, 2020 at 4:01 AM Maximilian Michels <mx...@apache.org> wrote:

> Hey Eleanore,
>
> The change will be part of the 2.21.0 release.
>
> -Max
>
> On 04.05.20 19:14, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the information and I saw this PR is already merged, just
> > wonder is it backported to the affected versions already
> > (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> > to wait for the 2.20.1 release?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Eleanore,
> >
> >     Exactly-once is not affected but the pipeline can fail to checkpoint
> >     after the maximum number of state cells have been reached. We are
> >     working on a fix [1].
> >
> >     Cheers,
> >     Max
> >
> >     [1] https://github.com/apache/beam/pull/11478
> >
> >     On 22.04.20 07:19, Eleanore Jin wrote:
> >     > Hi Maxi,
> >     >
> >     > I assume this will impact the Exactly Once Semantics that beam
> >     provided
> >     > as in the KafkaExactlyOnceSink, the processElement method is also
> >     > annotated with @RequiresStableInput?
> >     >
> >     > Thanks a lot!
> >     > Eleanore
> >     >
> >     > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> >     <mxm@apache.org <ma...@apache.org>
> >     > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >     >
> >     >     Hi Stephen,
> >     >
> >     >     Thanks for reporting the issue! David, good catch!
> >     >
> >     >     I think we have to resort to only using a single state cell for
> >     >     buffering on checkpoints, instead of using a new one for every
> >     >     checkpoint. I was under the assumption that, if the state cell
> was
> >     >     cleared, it would not be checkpointed but that does not seem
> to be
> >     >     the case.
> >     >
> >     >     Thanks,
> >     >     Max
> >     >
> >     >     On 21.04.20 09:29, David Morávek wrote:
> >     >     > Hi Stephen,
> >     >     >
> >     >     > nice catch and awesome report! ;) This definitely needs a
> >     proper fix.
> >     >     > I've created a new JIRA to track the issue and will try to
> >     resolve it
> >     >     > soon as this seems critical to me.
> >     >     >
> >     >     > https://issues.apache.org/jira/browse/BEAM-9794
> >     >     >
> >     >     > Thanks,
> >     >     > D.
> >     >     >
> >     >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> >     >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> >     >     > <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>
> >     >     <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>>> wrote:
> >     >     >
> >     >     >     I was able to reproduce this in a unit test:
> >     >     >
> >     >     >         @Test
> >     >     >
> >     >     >           *public* *void* test() *throws*
> InterruptedException,
> >     >     >         ExecutionException {
> >     >     >
> >     >     >             FlinkPipelineOptions options =
> >     >     >
> >      PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >     >     >
> >     >     >             options.setCheckpointingInterval(10L);
> >     >     >
> >     >     >             options.setParallelism(1);
> >     >     >
> >     >     >             options.setStreaming(*true*);
> >     >     >
> >     >     >             options.setRunner(FlinkRunner.*class*);
> >     >     >
> >     >     >             options.setFlinkMaster("[local]");
> >     >     >
> >     >     >             options.setStateBackend(*new*
> >     >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >     >     >
> >     >     >             Pipeline pipeline = Pipeline./create/(options);
> >     >     >
> >     >     >             pipeline
> >     >     >
> >     >     >                 .apply(Create./of/((Void) *null*))
> >     >     >
> >     >     >                 .apply(
> >     >     >
> >     >     >                     ParDo./of/(
> >     >     >
> >     >     >                         *new* DoFn<Void, Void>() {
> >     >     >
> >     >     >
> >     >     >                           *private* *static* *final* *long*
> >     >     >         */serialVersionUID/* = 1L;
> >     >     >
> >     >     >
> >     >     >                           @RequiresStableInput
> >     >     >
> >     >     >                           @ProcessElement
> >     >     >
> >     >     >                           *public* *void* processElement() {}
> >     >     >
> >     >     >                         }));
> >     >     >
> >     >     >             pipeline.run();
> >     >     >
> >     >     >           }
> >     >     >
> >     >     >
> >     >     >     It took a while to get to checkpoint 32,767, but
> >     eventually it
> >     >     did,
> >     >     >     and it failed with the same error I listed above.
> >     >     >
> >     >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     >     >     <stephenpatel89@gmail.com
> >     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>
> >     >     <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>>>
> >     >     wrote:
> >     >     >
> >     >     >         I have a Beam Pipeline (2.14) running on Flink
> (1.8.0,
> >     >     >         emr-5.26.0) that uses the RequiresStableInput
> feature.
> >     >     >
> >     >     >         Currently it's configured to checkpoint once a
> >     minute, and
> >     >     after
> >     >     >         around 32000-33000 checkpoints, it fails with:
> >     >     >
> >     >     >             2020-04-15 13:15:02,920 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Triggering checkpoint 32701 @ 1586956502911
> >     for job
> >     >     >             9953424f21e240112dd23ab4f8320b60.
> >     >     >             2020-04-15 13:15:05,762 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Completed checkpoint 32701 for job
> >     >     >             9953424f21e240112dd23ab4f8320b60 (795385496
> bytes in
> >     >     2667 ms).
> >     >     >             2020-04-15 13:16:02,919 INFO
> >     >     >
> >     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >     >               - Triggering checkpoint 32702 @ 1586956562911
> >     for job
> >     >     >             9953424f21e240112dd23ab4f8320b60.
> >     >     >             2020-04-15 13:16:03,147 INFO
> >     >     >
> >     >       org.apache.flink.runtime.executiongraph.ExecutionGraph
> >     >     >                - <operator_name> (1/2)
> >     >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
> >     >     RUNNING to
> >     >     >             FAILED.
> >     >     >             AsynchronousException{java.lang.Exception: Could
> not
> >     >     >             materialize checkpoint 32702 for operator
> >     <operator_name>
> >     >     >             (1/2).}
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     >     >             at
> >     >     >
> >     >
> >
>    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     >     >             at java.lang.Thread.run(Thread.java:748)
> >     >     >             Caused by: java.lang.Exception: Could not
> >     materialize
> >     >     >             checkpoint 32702 for operator <operator_name>
> (1/2).
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >     >     >             ... 6 more
> >     >     >             Caused by:
> java.util.concurrent.ExecutionException:
> >     >     >             java.lang.IllegalArgumentException
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >     >     >             ... 5 more
> >     >     >             Caused by: java.lang.IllegalArgumentException
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >     >     >             at
> >     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >     >             at
> >     >     >
> >     >
> >
>    org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >     >     >             ... 7 more
> >     >     >
> >     >     >
> >     >     >         The exception comes from
> >     >     >
> >     >
> >       here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >     >     >
> >     >     >         In the Flink Runner code, I can see that each
> >     checkpoint will
> >     >     >         result in a new OperatorState (or KeyedState if the
> >     stream is
> >     >     >         keyed):
> >     >     >
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >     >     >
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >     >     >
> >     >     >         This seems to be the reason the pipeline will
> eventually
> >     >     die.
> >     >     >
> >     >     >         While a workaround might be to increase the time
> between
> >     >     >         checkpoints, it seems like any pipeline running on
> >     flink,
> >     >     using
> >     >     >         the RequiresStableInput is limited in the amount of
> time
> >     >     that it
> >     >     >         can run without being started from scratch.
> >     >     >
> >     >
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hey Eleanore,

The change will be part of the 2.21.0 release.

-Max

On 04.05.20 19:14, Eleanore Jin wrote:
> Hi Max, 
> 
> Thanks for the information and I saw this PR is already merged, just
> wonder is it backported to the affected versions already
> (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> to wait for the 2.20.1 release? 
> 
> Thanks a lot!
> Eleanore
> 
> On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Eleanore,
> 
>     Exactly-once is not affected but the pipeline can fail to checkpoint
>     after the maximum number of state cells have been reached. We are
>     working on a fix [1].
> 
>     Cheers,
>     Max
> 
>     [1] https://github.com/apache/beam/pull/11478
> 
>     On 22.04.20 07:19, Eleanore Jin wrote:
>     > Hi Maxi, 
>     >
>     > I assume this will impact the Exactly Once Semantics that beam
>     provided
>     > as in the KafkaExactlyOnceSink, the processElement method is also
>     > annotated with @RequiresStableInput?
>     >
>     > Thanks a lot!
>     > Eleanore
>     >
>     > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>
>     > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>     >
>     >     Hi Stephen,
>     >
>     >     Thanks for reporting the issue! David, good catch!
>     >
>     >     I think we have to resort to only using a single state cell for
>     >     buffering on checkpoints, instead of using a new one for every
>     >     checkpoint. I was under the assumption that, if the state cell was
>     >     cleared, it would not be checkpointed but that does not seem to be
>     >     the case.
>     >
>     >     Thanks,
>     >     Max
>     >
>     >     On 21.04.20 09:29, David Morávek wrote:
>     >     > Hi Stephen,
>     >     >
>     >     > nice catch and awesome report! ;) This definitely needs a
>     proper fix.
>     >     > I've created a new JIRA to track the issue and will try to
>     resolve it
>     >     > soon as this seems critical to me.
>     >     >
>     >     > https://issues.apache.org/jira/browse/BEAM-9794
>     >     >
>     >     > Thanks,
>     >     > D.
>     >     >
>     >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
>     >     <stephenpatel89@gmail.com <ma...@gmail.com>
>     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
>     >     > <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>
>     >     <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>>> wrote:
>     >     >
>     >     >     I was able to reproduce this in a unit test:
>     >     >
>     >     >         @Test
>     >     >
>     >     >           *public* *void* test() *throws* InterruptedException,
>     >     >         ExecutionException {
>     >     >
>     >     >             FlinkPipelineOptions options =
>     >     >       
>      PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>     >     >
>     >     >             options.setCheckpointingInterval(10L);
>     >     >
>     >     >             options.setParallelism(1);
>     >     >
>     >     >             options.setStreaming(*true*);
>     >     >
>     >     >             options.setRunner(FlinkRunner.*class*);
>     >     >
>     >     >             options.setFlinkMaster("[local]");
>     >     >
>     >     >             options.setStateBackend(*new*
>     >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
>     >     >
>     >     >             Pipeline pipeline = Pipeline./create/(options);
>     >     >
>     >     >             pipeline
>     >     >
>     >     >                 .apply(Create./of/((Void) *null*))
>     >     >
>     >     >                 .apply(
>     >     >
>     >     >                     ParDo./of/(
>     >     >
>     >     >                         *new* DoFn<Void, Void>() {
>     >     >
>     >     >
>     >     >                           *private* *static* *final* *long*
>     >     >         */serialVersionUID/* = 1L;
>     >     >
>     >     >
>     >     >                           @RequiresStableInput
>     >     >
>     >     >                           @ProcessElement
>     >     >
>     >     >                           *public* *void* processElement() {}
>     >     >
>     >     >                         }));
>     >     >
>     >     >             pipeline.run();
>     >     >
>     >     >           }
>     >     >
>     >     >
>     >     >     It took a while to get to checkpoint 32,767, but
>     eventually it
>     >     did,
>     >     >     and it failed with the same error I listed above.
>     >     >
>     >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     >     >     <stephenpatel89@gmail.com
>     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>
>     >     <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>>>
>     >     wrote:
>     >     >
>     >     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>     >     >         emr-5.26.0) that uses the RequiresStableInput feature.
>     >     >
>     >     >         Currently it's configured to checkpoint once a
>     minute, and
>     >     after
>     >     >         around 32000-33000 checkpoints, it fails with: 
>     >     >
>     >     >             2020-04-15 13:15:02,920 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Triggering checkpoint 32701 @ 1586956502911
>     for job
>     >     >             9953424f21e240112dd23ab4f8320b60.
>     >     >             2020-04-15 13:15:05,762 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Completed checkpoint 32701 for job
>     >     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
>     >     2667 ms).
>     >     >             2020-04-15 13:16:02,919 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Triggering checkpoint 32702 @ 1586956562911
>     for job
>     >     >             9953424f21e240112dd23ab4f8320b60.
>     >     >             2020-04-15 13:16:03,147 INFO
>     >     >           
>     >       org.apache.flink.runtime.executiongraph.ExecutionGraph    
>     >     >                - <operator_name> (1/2)
>     >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
>     >     RUNNING to
>     >     >             FAILED.
>     >     >             AsynchronousException{java.lang.Exception: Could not
>     >     >             materialize checkpoint 32702 for operator
>     <operator_name>
>     >     >             (1/2).}
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >     >             at
>     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >     >             at java.lang.Thread.run(Thread.java:748)
>     >     >             Caused by: java.lang.Exception: Could not
>     materialize
>     >     >             checkpoint 32702 for operator <operator_name> (1/2).
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>     >     >             ... 6 more
>     >     >             Caused by: java.util.concurrent.ExecutionException:
>     >     >             java.lang.IllegalArgumentException
>     >     >             at
>     >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     >     >             at
>     >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>     >     >             ... 5 more
>     >     >             Caused by: java.lang.IllegalArgumentException
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>     >     >             at
>     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>     >     >             ... 7 more
>     >     >
>     >     >
>     >     >         The exception comes from
>     >     >       
>     >   
>       here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>     >     >
>     >     >         In the Flink Runner code, I can see that each
>     checkpoint will
>     >     >         result in a new OperatorState (or KeyedState if the
>     stream is
>     >     >         keyed):
>     >     >       
>     >   
>       https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>     >     >       
>     >   
>       https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>     >     >
>     >     >         This seems to be the reason the pipeline will eventually
>     >     die.  
>     >     >
>     >     >         While a workaround might be to increase the time between
>     >     >         checkpoints, it seems like any pipeline running on
>     flink,
>     >     using
>     >     >         the RequiresStableInput is limited in the amount of time
>     >     that it
>     >     >         can run without being started from scratch.
>     >     >
>     >
> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hey Eleanore,

The change will be part of the 2.21.0 release.

-Max

On 04.05.20 19:14, Eleanore Jin wrote:
> Hi Max, 
> 
> Thanks for the information and I saw this PR is already merged, just
> wonder is it backported to the affected versions already
> (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> to wait for the 2.20.1 release? 
> 
> Thanks a lot!
> Eleanore
> 
> On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Eleanore,
> 
>     Exactly-once is not affected but the pipeline can fail to checkpoint
>     after the maximum number of state cells have been reached. We are
>     working on a fix [1].
> 
>     Cheers,
>     Max
> 
>     [1] https://github.com/apache/beam/pull/11478
> 
>     On 22.04.20 07:19, Eleanore Jin wrote:
>     > Hi Maxi, 
>     >
>     > I assume this will impact the Exactly Once Semantics that beam
>     provided
>     > as in the KafkaExactlyOnceSink, the processElement method is also
>     > annotated with @RequiresStableInput?
>     >
>     > Thanks a lot!
>     > Eleanore
>     >
>     > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
>     <mxm@apache.org <ma...@apache.org>
>     > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
>     >
>     >     Hi Stephen,
>     >
>     >     Thanks for reporting the issue! David, good catch!
>     >
>     >     I think we have to resort to only using a single state cell for
>     >     buffering on checkpoints, instead of using a new one for every
>     >     checkpoint. I was under the assumption that, if the state cell was
>     >     cleared, it would not be checkpointed but that does not seem to be
>     >     the case.
>     >
>     >     Thanks,
>     >     Max
>     >
>     >     On 21.04.20 09:29, David Morávek wrote:
>     >     > Hi Stephen,
>     >     >
>     >     > nice catch and awesome report! ;) This definitely needs a
>     proper fix.
>     >     > I've created a new JIRA to track the issue and will try to
>     resolve it
>     >     > soon as this seems critical to me.
>     >     >
>     >     > https://issues.apache.org/jira/browse/BEAM-9794
>     >     >
>     >     > Thanks,
>     >     > D.
>     >     >
>     >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
>     >     <stephenpatel89@gmail.com <ma...@gmail.com>
>     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
>     >     > <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>
>     >     <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>>> wrote:
>     >     >
>     >     >     I was able to reproduce this in a unit test:
>     >     >
>     >     >         @Test
>     >     >
>     >     >           *public* *void* test() *throws* InterruptedException,
>     >     >         ExecutionException {
>     >     >
>     >     >             FlinkPipelineOptions options =
>     >     >       
>      PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>     >     >
>     >     >             options.setCheckpointingInterval(10L);
>     >     >
>     >     >             options.setParallelism(1);
>     >     >
>     >     >             options.setStreaming(*true*);
>     >     >
>     >     >             options.setRunner(FlinkRunner.*class*);
>     >     >
>     >     >             options.setFlinkMaster("[local]");
>     >     >
>     >     >             options.setStateBackend(*new*
>     >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
>     >     >
>     >     >             Pipeline pipeline = Pipeline./create/(options);
>     >     >
>     >     >             pipeline
>     >     >
>     >     >                 .apply(Create./of/((Void) *null*))
>     >     >
>     >     >                 .apply(
>     >     >
>     >     >                     ParDo./of/(
>     >     >
>     >     >                         *new* DoFn<Void, Void>() {
>     >     >
>     >     >
>     >     >                           *private* *static* *final* *long*
>     >     >         */serialVersionUID/* = 1L;
>     >     >
>     >     >
>     >     >                           @RequiresStableInput
>     >     >
>     >     >                           @ProcessElement
>     >     >
>     >     >                           *public* *void* processElement() {}
>     >     >
>     >     >                         }));
>     >     >
>     >     >             pipeline.run();
>     >     >
>     >     >           }
>     >     >
>     >     >
>     >     >     It took a while to get to checkpoint 32,767, but
>     eventually it
>     >     did,
>     >     >     and it failed with the same error I listed above.
>     >     >
>     >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     >     >     <stephenpatel89@gmail.com
>     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>
>     >     <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>>>
>     >     wrote:
>     >     >
>     >     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>     >     >         emr-5.26.0) that uses the RequiresStableInput feature.
>     >     >
>     >     >         Currently it's configured to checkpoint once a
>     minute, and
>     >     after
>     >     >         around 32000-33000 checkpoints, it fails with: 
>     >     >
>     >     >             2020-04-15 13:15:02,920 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Triggering checkpoint 32701 @ 1586956502911
>     for job
>     >     >             9953424f21e240112dd23ab4f8320b60.
>     >     >             2020-04-15 13:15:05,762 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Completed checkpoint 32701 for job
>     >     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
>     >     2667 ms).
>     >     >             2020-04-15 13:16:02,919 INFO
>     >     >           
>     >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >     >               - Triggering checkpoint 32702 @ 1586956562911
>     for job
>     >     >             9953424f21e240112dd23ab4f8320b60.
>     >     >             2020-04-15 13:16:03,147 INFO
>     >     >           
>     >       org.apache.flink.runtime.executiongraph.ExecutionGraph    
>     >     >                - <operator_name> (1/2)
>     >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
>     >     RUNNING to
>     >     >             FAILED.
>     >     >             AsynchronousException{java.lang.Exception: Could not
>     >     >             materialize checkpoint 32702 for operator
>     <operator_name>
>     >     >             (1/2).}
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >     >             at
>     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >     >             at
>     >     >           
>     >   
>       java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >     >             at java.lang.Thread.run(Thread.java:748)
>     >     >             Caused by: java.lang.Exception: Could not
>     materialize
>     >     >             checkpoint 32702 for operator <operator_name> (1/2).
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>     >     >             ... 6 more
>     >     >             Caused by: java.util.concurrent.ExecutionException:
>     >     >             java.lang.IllegalArgumentException
>     >     >             at
>     >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     >     >             at
>     >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>     >     >             ... 5 more
>     >     >             Caused by: java.lang.IllegalArgumentException
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>     >     >             at
>     >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >     >             at
>     >     >           
>     >   
>       org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>     >     >             ... 7 more
>     >     >
>     >     >
>     >     >         The exception comes from
>     >     >       
>     >   
>       here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>     >     >
>     >     >         In the Flink Runner code, I can see that each
>     checkpoint will
>     >     >         result in a new OperatorState (or KeyedState if the
>     stream is
>     >     >         keyed):
>     >     >       
>     >   
>       https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>     >     >       
>     >   
>       https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>     >     >
>     >     >         This seems to be the reason the pipeline will eventually
>     >     die.  
>     >     >
>     >     >         While a workaround might be to increase the time between
>     >     >         checkpoints, it seems like any pipeline running on
>     flink,
>     >     using
>     >     >         the RequiresStableInput is limited in the amount of time
>     >     that it
>     >     >         can run without being started from scratch.
>     >     >
>     >
> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks for the information and I saw this PR is already merged, just wonder
is it backported to the affected versions already (i.e. 2.14.0, 2.15.0,
2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have to wait for the 2.20.1
release?

Thanks a lot!
Eleanore

On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Stephen,
> >
> >     Thanks for reporting the issue! David, good catch!
> >
> >     I think we have to resort to only using a single state cell for
> >     buffering on checkpoints, instead of using a new one for every
> >     checkpoint. I was under the assumption that, if the state cell was
> >     cleared, it would not be checkpointed but that does not seem to be
> >     the case.
> >
> >     Thanks,
> >     Max
> >
> >     On 21.04.20 09:29, David Morávek wrote:
> >     > Hi Stephen,
> >     >
> >     > nice catch and awesome report! ;) This definitely needs a proper
> fix.
> >     > I've created a new JIRA to track the issue and will try to resolve
> it
> >     > soon as this seems critical to me.
> >     >
> >     > https://issues.apache.org/jira/browse/BEAM-9794
> >     >
> >     > Thanks,
> >     > D.
> >     >
> >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     > <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>> wrote:
> >     >
> >     >     I was able to reproduce this in a unit test:
> >     >
> >     >         @Test
> >     >
> >     >           *public* *void* test() *throws* InterruptedException,
> >     >         ExecutionException {
> >     >
> >     >             FlinkPipelineOptions options =
> >     >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >     >
> >     >             options.setCheckpointingInterval(10L);
> >     >
> >     >             options.setParallelism(1);
> >     >
> >     >             options.setStreaming(*true*);
> >     >
> >     >             options.setRunner(FlinkRunner.*class*);
> >     >
> >     >             options.setFlinkMaster("[local]");
> >     >
> >     >             options.setStateBackend(*new*
> >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >     >
> >     >             Pipeline pipeline = Pipeline./create/(options);
> >     >
> >     >             pipeline
> >     >
> >     >                 .apply(Create./of/((Void) *null*))
> >     >
> >     >                 .apply(
> >     >
> >     >                     ParDo./of/(
> >     >
> >     >                         *new* DoFn<Void, Void>() {
> >     >
> >     >
> >     >                           *private* *static* *final* *long*
> >     >         */serialVersionUID/* = 1L;
> >     >
> >     >
> >     >                           @RequiresStableInput
> >     >
> >     >                           @ProcessElement
> >     >
> >     >                           *public* *void* processElement() {}
> >     >
> >     >                         }));
> >     >
> >     >             pipeline.run();
> >     >
> >     >           }
> >     >
> >     >
> >     >     It took a while to get to checkpoint 32,767, but eventually it
> >     did,
> >     >     and it failed with the same error I listed above.
> >     >
> >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> >     wrote:
> >     >
> >     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> >     >         emr-5.26.0) that uses the RequiresStableInput feature.
> >     >
> >     >         Currently it's configured to checkpoint once a minute, and
> >     after
> >     >         around 32000-33000 checkpoints, it fails with:
> >     >
> >     >             2020-04-15 13:15:02,920 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Triggering checkpoint 32701 @ 1586956502911 for job
> >     >             9953424f21e240112dd23ab4f8320b60.
> >     >             2020-04-15 13:15:05,762 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Completed checkpoint 32701 for job
> >     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> >     2667 ms).
> >     >             2020-04-15 13:16:02,919 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Triggering checkpoint 32702 @ 1586956562911 for job
> >     >             9953424f21e240112dd23ab4f8320b60.
> >     >             2020-04-15 13:16:03,147 INFO
> >     >
> >       org.apache.flink.runtime.executiongraph.ExecutionGraph
> >     >                - <operator_name> (1/2)
> >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
> >     RUNNING to
> >     >             FAILED.
> >     >             AsynchronousException{java.lang.Exception: Could not
> >     >             materialize checkpoint 32702 for operator
> <operator_name>
> >     >             (1/2).}
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >     >             at
> >     >
> >
>   java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >     >             at
> >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >             at
> >     >
> >
>   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     >             at
> >     >
> >
>   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     >             at java.lang.Thread.run(Thread.java:748)
> >     >             Caused by: java.lang.Exception: Could not materialize
> >     >             checkpoint 32702 for operator <operator_name> (1/2).
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >     >             ... 6 more
> >     >             Caused by: java.util.concurrent.ExecutionException:
> >     >             java.lang.IllegalArgumentException
> >     >             at
> >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >     >             at
> >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >     >             ... 5 more
> >     >             Caused by: java.lang.IllegalArgumentException
> >     >             at
> >     >
> >
>   org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >     >             at
> >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >     >             ... 7 more
> >     >
> >     >
> >     >         The exception comes from
> >     >
> >      here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >     >
> >     >         In the Flink Runner code, I can see that each checkpoint
> will
> >     >         result in a new OperatorState (or KeyedState if the stream
> is
> >     >         keyed):
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >     >
> >     >         This seems to be the reason the pipeline will eventually
> >     die.
> >     >
> >     >         While a workaround might be to increase the time between
> >     >         checkpoints, it seems like any pipeline running on flink,
> >     using
> >     >         the RequiresStableInput is limited in the amount of time
> >     that it
> >     >         can run without being started from scratch.
> >     >
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks for the information and I saw this PR is already merged, just wonder
is it backported to the affected versions already (i.e. 2.14.0, 2.15.0,
2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have to wait for the 2.20.1
release?

Thanks a lot!
Eleanore

On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Stephen,
> >
> >     Thanks for reporting the issue! David, good catch!
> >
> >     I think we have to resort to only using a single state cell for
> >     buffering on checkpoints, instead of using a new one for every
> >     checkpoint. I was under the assumption that, if the state cell was
> >     cleared, it would not be checkpointed but that does not seem to be
> >     the case.
> >
> >     Thanks,
> >     Max
> >
> >     On 21.04.20 09:29, David Morávek wrote:
> >     > Hi Stephen,
> >     >
> >     > nice catch and awesome report! ;) This definitely needs a proper
> fix.
> >     > I've created a new JIRA to track the issue and will try to resolve
> it
> >     > soon as this seems critical to me.
> >     >
> >     > https://issues.apache.org/jira/browse/BEAM-9794
> >     >
> >     > Thanks,
> >     > D.
> >     >
> >     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     > <mailto:stephenpatel89@gmail.com
> >     <ma...@gmail.com>>> wrote:
> >     >
> >     >     I was able to reproduce this in a unit test:
> >     >
> >     >         @Test
> >     >
> >     >           *public* *void* test() *throws* InterruptedException,
> >     >         ExecutionException {
> >     >
> >     >             FlinkPipelineOptions options =
> >     >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >     >
> >     >             options.setCheckpointingInterval(10L);
> >     >
> >     >             options.setParallelism(1);
> >     >
> >     >             options.setStreaming(*true*);
> >     >
> >     >             options.setRunner(FlinkRunner.*class*);
> >     >
> >     >             options.setFlinkMaster("[local]");
> >     >
> >     >             options.setStateBackend(*new*
> >     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >     >
> >     >             Pipeline pipeline = Pipeline./create/(options);
> >     >
> >     >             pipeline
> >     >
> >     >                 .apply(Create./of/((Void) *null*))
> >     >
> >     >                 .apply(
> >     >
> >     >                     ParDo./of/(
> >     >
> >     >                         *new* DoFn<Void, Void>() {
> >     >
> >     >
> >     >                           *private* *static* *final* *long*
> >     >         */serialVersionUID/* = 1L;
> >     >
> >     >
> >     >                           @RequiresStableInput
> >     >
> >     >                           @ProcessElement
> >     >
> >     >                           *public* *void* processElement() {}
> >     >
> >     >                         }));
> >     >
> >     >             pipeline.run();
> >     >
> >     >           }
> >     >
> >     >
> >     >     It took a while to get to checkpoint 32,767, but eventually it
> >     did,
> >     >     and it failed with the same error I listed above.
> >     >
> >     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     >     <stephenpatel89@gmail.com <ma...@gmail.com>
> >     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> >     wrote:
> >     >
> >     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> >     >         emr-5.26.0) that uses the RequiresStableInput feature.
> >     >
> >     >         Currently it's configured to checkpoint once a minute, and
> >     after
> >     >         around 32000-33000 checkpoints, it fails with:
> >     >
> >     >             2020-04-15 13:15:02,920 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Triggering checkpoint 32701 @ 1586956502911 for job
> >     >             9953424f21e240112dd23ab4f8320b60.
> >     >             2020-04-15 13:15:05,762 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Completed checkpoint 32701 for job
> >     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> >     2667 ms).
> >     >             2020-04-15 13:16:02,919 INFO
> >     >
> >       org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >     >               - Triggering checkpoint 32702 @ 1586956562911 for job
> >     >             9953424f21e240112dd23ab4f8320b60.
> >     >             2020-04-15 13:16:03,147 INFO
> >     >
> >       org.apache.flink.runtime.executiongraph.ExecutionGraph
> >     >                - <operator_name> (1/2)
> >     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
> >     RUNNING to
> >     >             FAILED.
> >     >             AsynchronousException{java.lang.Exception: Could not
> >     >             materialize checkpoint 32702 for operator
> <operator_name>
> >     >             (1/2).}
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >     >             at
> >     >
> >
>   java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >     >             at
> >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >             at
> >     >
> >
>   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >     >             at
> >     >
> >
>   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >     >             at java.lang.Thread.run(Thread.java:748)
> >     >             Caused by: java.lang.Exception: Could not materialize
> >     >             checkpoint 32702 for operator <operator_name> (1/2).
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >     >             ... 6 more
> >     >             Caused by: java.util.concurrent.ExecutionException:
> >     >             java.lang.IllegalArgumentException
> >     >             at
> >     java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >     >             at
> >     java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >     >             at
> >     >
> >
>   org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >     >             ... 5 more
> >     >             Caused by: java.lang.IllegalArgumentException
> >     >             at
> >     >
> >
>   org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >     >             at
> >     java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >     >             at
> >     >
> >
>   org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >     >             ... 7 more
> >     >
> >     >
> >     >         The exception comes from
> >     >
> >      here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >     >
> >     >         In the Flink Runner code, I can see that each checkpoint
> will
> >     >         result in a new OperatorState (or KeyedState if the stream
> is
> >     >         keyed):
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >     >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >     >
> >     >         This seems to be the reason the pipeline will eventually
> >     die.
> >     >
> >     >         While a workaround might be to increase the time between
> >     >         checkpoints, it seems like any pipeline running on flink,
> >     using
> >     >         the RequiresStableInput is limited in the amount of time
> >     that it
> >     >         can run without being started from scratch.
> >     >
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hi Eleanore,

Exactly-once is not affected but the pipeline can fail to checkpoint
after the maximum number of state cells have been reached. We are
working on a fix [1].

Cheers,
Max

[1] https://github.com/apache/beam/pull/11478

On 22.04.20 07:19, Eleanore Jin wrote:
> Hi Maxi, 
> 
> I assume this will impact the Exactly Once Semantics that beam provided
> as in the KafkaExactlyOnceSink, the processElement method is also
> annotated with @RequiresStableInput?
> 
> Thanks a lot!
> Eleanore
> 
> On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Stephen,
> 
>     Thanks for reporting the issue! David, good catch!
> 
>     I think we have to resort to only using a single state cell for
>     buffering on checkpoints, instead of using a new one for every
>     checkpoint. I was under the assumption that, if the state cell was
>     cleared, it would not be checkpointed but that does not seem to be
>     the case.
> 
>     Thanks,
>     Max
> 
>     On 21.04.20 09:29, David Morávek wrote:
>     > Hi Stephen,
>     >
>     > nice catch and awesome report! ;) This definitely needs a proper fix.
>     > I've created a new JIRA to track the issue and will try to resolve it
>     > soon as this seems critical to me.
>     >
>     > https://issues.apache.org/jira/browse/BEAM-9794
>     >
>     > Thanks,
>     > D.
>     >
>     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
>     <stephenpatel89@gmail.com <ma...@gmail.com>
>     > <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>> wrote:
>     >
>     >     I was able to reproduce this in a unit test:
>     >
>     >         @Test
>     >
>     >           *public* *void* test() *throws* InterruptedException,
>     >         ExecutionException {
>     >
>     >             FlinkPipelineOptions options =
>     >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>     >
>     >             options.setCheckpointingInterval(10L);
>     >
>     >             options.setParallelism(1);
>     >
>     >             options.setStreaming(*true*);
>     >
>     >             options.setRunner(FlinkRunner.*class*);
>     >
>     >             options.setFlinkMaster("[local]");
>     >
>     >             options.setStateBackend(*new*
>     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
>     >
>     >             Pipeline pipeline = Pipeline./create/(options);
>     >
>     >             pipeline
>     >
>     >                 .apply(Create./of/((Void) *null*))
>     >
>     >                 .apply(
>     >
>     >                     ParDo./of/(
>     >
>     >                         *new* DoFn<Void, Void>() {
>     >
>     >
>     >                           *private* *static* *final* *long*
>     >         */serialVersionUID/* = 1L;
>     >
>     >
>     >                           @RequiresStableInput
>     >
>     >                           @ProcessElement
>     >
>     >                           *public* *void* processElement() {}
>     >
>     >                         }));
>     >
>     >             pipeline.run();
>     >
>     >           }
>     >
>     >
>     >     It took a while to get to checkpoint 32,767, but eventually it
>     did,
>     >     and it failed with the same error I listed above.
>     >
>     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     >     <stephenpatel89@gmail.com <ma...@gmail.com>
>     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
>     wrote:
>     >
>     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>     >         emr-5.26.0) that uses the RequiresStableInput feature.
>     >
>     >         Currently it's configured to checkpoint once a minute, and
>     after
>     >         around 32000-33000 checkpoints, it fails with: 
>     >
>     >             2020-04-15 13:15:02,920 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Triggering checkpoint 32701 @ 1586956502911 for job
>     >             9953424f21e240112dd23ab4f8320b60.
>     >             2020-04-15 13:15:05,762 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Completed checkpoint 32701 for job
>     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
>     2667 ms).
>     >             2020-04-15 13:16:02,919 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Triggering checkpoint 32702 @ 1586956562911 for job
>     >             9953424f21e240112dd23ab4f8320b60.
>     >             2020-04-15 13:16:03,147 INFO
>     >           
>       org.apache.flink.runtime.executiongraph.ExecutionGraph    
>     >                - <operator_name> (1/2)
>     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
>     RUNNING to
>     >             FAILED.
>     >             AsynchronousException{java.lang.Exception: Could not
>     >             materialize checkpoint 32702 for operator <operator_name>
>     >             (1/2).}
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>     >             at
>     >           
>      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >             at
>     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >             at
>     >           
>      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >             at
>     >           
>      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >             at java.lang.Thread.run(Thread.java:748)
>     >             Caused by: java.lang.Exception: Could not materialize
>     >             checkpoint 32702 for operator <operator_name> (1/2).
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>     >             ... 6 more
>     >             Caused by: java.util.concurrent.ExecutionException:
>     >             java.lang.IllegalArgumentException
>     >             at
>     java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     >             at
>     java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     >             at
>     >           
>      org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>     >             at
>     >           
>      org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>     >             ... 5 more
>     >             Caused by: java.lang.IllegalArgumentException
>     >             at
>     >           
>      org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>     >             at
>     >           
>      org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>     >             at
>     >           
>      org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>     >             at
>     >           
>      org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>     >             at
>     >           
>      org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>     >             at
>     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >             at
>     >           
>      org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>     >             ... 7 more
>     >
>     >
>     >         The exception comes from
>     >       
>      here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>     >
>     >         In the Flink Runner code, I can see that each checkpoint will
>     >         result in a new OperatorState (or KeyedState if the stream is
>     >         keyed):
>     >       
>      https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>     >       
>      https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>     >
>     >         This seems to be the reason the pipeline will eventually
>     die.  
>     >
>     >         While a workaround might be to increase the time between
>     >         checkpoints, it seems like any pipeline running on flink,
>     using
>     >         the RequiresStableInput is limited in the amount of time
>     that it
>     >         can run without being started from scratch.
>     >
> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hi Eleanore,

Exactly-once is not affected but the pipeline can fail to checkpoint
after the maximum number of state cells have been reached. We are
working on a fix [1].

Cheers,
Max

[1] https://github.com/apache/beam/pull/11478

On 22.04.20 07:19, Eleanore Jin wrote:
> Hi Maxi, 
> 
> I assume this will impact the Exactly Once Semantics that beam provided
> as in the KafkaExactlyOnceSink, the processElement method is also
> annotated with @RequiresStableInput?
> 
> Thanks a lot!
> Eleanore
> 
> On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Stephen,
> 
>     Thanks for reporting the issue! David, good catch!
> 
>     I think we have to resort to only using a single state cell for
>     buffering on checkpoints, instead of using a new one for every
>     checkpoint. I was under the assumption that, if the state cell was
>     cleared, it would not be checkpointed but that does not seem to be
>     the case.
> 
>     Thanks,
>     Max
> 
>     On 21.04.20 09:29, David Morávek wrote:
>     > Hi Stephen,
>     >
>     > nice catch and awesome report! ;) This definitely needs a proper fix.
>     > I've created a new JIRA to track the issue and will try to resolve it
>     > soon as this seems critical to me.
>     >
>     > https://issues.apache.org/jira/browse/BEAM-9794
>     >
>     > Thanks,
>     > D.
>     >
>     > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
>     <stephenpatel89@gmail.com <ma...@gmail.com>
>     > <mailto:stephenpatel89@gmail.com
>     <ma...@gmail.com>>> wrote:
>     >
>     >     I was able to reproduce this in a unit test:
>     >
>     >         @Test
>     >
>     >           *public* *void* test() *throws* InterruptedException,
>     >         ExecutionException {
>     >
>     >             FlinkPipelineOptions options =
>     >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>     >
>     >             options.setCheckpointingInterval(10L);
>     >
>     >             options.setParallelism(1);
>     >
>     >             options.setStreaming(*true*);
>     >
>     >             options.setRunner(FlinkRunner.*class*);
>     >
>     >             options.setFlinkMaster("[local]");
>     >
>     >             options.setStateBackend(*new*
>     >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
>     >
>     >             Pipeline pipeline = Pipeline./create/(options);
>     >
>     >             pipeline
>     >
>     >                 .apply(Create./of/((Void) *null*))
>     >
>     >                 .apply(
>     >
>     >                     ParDo./of/(
>     >
>     >                         *new* DoFn<Void, Void>() {
>     >
>     >
>     >                           *private* *static* *final* *long*
>     >         */serialVersionUID/* = 1L;
>     >
>     >
>     >                           @RequiresStableInput
>     >
>     >                           @ProcessElement
>     >
>     >                           *public* *void* processElement() {}
>     >
>     >                         }));
>     >
>     >             pipeline.run();
>     >
>     >           }
>     >
>     >
>     >     It took a while to get to checkpoint 32,767, but eventually it
>     did,
>     >     and it failed with the same error I listed above.
>     >
>     >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     >     <stephenpatel89@gmail.com <ma...@gmail.com>
>     <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
>     wrote:
>     >
>     >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>     >         emr-5.26.0) that uses the RequiresStableInput feature.
>     >
>     >         Currently it's configured to checkpoint once a minute, and
>     after
>     >         around 32000-33000 checkpoints, it fails with: 
>     >
>     >             2020-04-15 13:15:02,920 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Triggering checkpoint 32701 @ 1586956502911 for job
>     >             9953424f21e240112dd23ab4f8320b60.
>     >             2020-04-15 13:15:05,762 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Completed checkpoint 32701 for job
>     >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
>     2667 ms).
>     >             2020-04-15 13:16:02,919 INFO
>     >           
>       org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>     >               - Triggering checkpoint 32702 @ 1586956562911 for job
>     >             9953424f21e240112dd23ab4f8320b60.
>     >             2020-04-15 13:16:03,147 INFO
>     >           
>       org.apache.flink.runtime.executiongraph.ExecutionGraph    
>     >                - <operator_name> (1/2)
>     >             (f4737add01961f8b42b8eb4e791b83ba) switched from
>     RUNNING to
>     >             FAILED.
>     >             AsynchronousException{java.lang.Exception: Could not
>     >             materialize checkpoint 32702 for operator <operator_name>
>     >             (1/2).}
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>     >             at
>     >           
>      java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     >             at
>     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >             at
>     >           
>      java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>     >             at
>     >           
>      java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>     >             at java.lang.Thread.run(Thread.java:748)
>     >             Caused by: java.lang.Exception: Could not materialize
>     >             checkpoint 32702 for operator <operator_name> (1/2).
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>     >             ... 6 more
>     >             Caused by: java.util.concurrent.ExecutionException:
>     >             java.lang.IllegalArgumentException
>     >             at
>     java.util.concurrent.FutureTask.report(FutureTask.java:122)
>     >             at
>     java.util.concurrent.FutureTask.get(FutureTask.java:192)
>     >             at
>     >           
>      org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>     >             at
>     >           
>      org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>     >             at
>     >           
>      org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>     >             ... 5 more
>     >             Caused by: java.lang.IllegalArgumentException
>     >             at
>     >           
>      org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>     >             at
>     >           
>      org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>     >             at
>     >           
>      org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>     >             at
>     >           
>      org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>     >             at
>     >           
>      org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>     >             at
>     java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     >             at
>     >           
>      org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>     >             ... 7 more
>     >
>     >
>     >         The exception comes from
>     >       
>      here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>     >
>     >         In the Flink Runner code, I can see that each checkpoint will
>     >         result in a new OperatorState (or KeyedState if the stream is
>     >         keyed):
>     >       
>      https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>     >       
>      https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>     >
>     >         This seems to be the reason the pipeline will eventually
>     die.  
>     >
>     >         While a workaround might be to increase the time between
>     >         checkpoints, it seems like any pipeline running on flink,
>     using
>     >         the RequiresStableInput is limited in the amount of time
>     that it
>     >         can run without being started from scratch.
>     >
> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Maxi,

I assume this will impact the Exactly Once Semantics that beam provided as
in the KafkaExactlyOnceSink, the processElement method is also annotated
with @RequiresStableInput?

Thanks a lot!
Eleanore

On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be the
> case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     I was able to reproduce this in a unit test:
> >
> >         @Test
> >
> >           *public* *void* test() *throws* InterruptedException,
> >         ExecutionException {
> >
> >             FlinkPipelineOptions options =
> >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> >             options.setCheckpointingInterval(10L);
> >
> >             options.setParallelism(1);
> >
> >             options.setStreaming(*true*);
> >
> >             options.setRunner(FlinkRunner.*class*);
> >
> >             options.setFlinkMaster("[local]");
> >
> >             options.setStateBackend(*new*
> >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> >             Pipeline pipeline = Pipeline./create/(options);
> >
> >             pipeline
> >
> >                 .apply(Create./of/((Void) *null*))
> >
> >                 .apply(
> >
> >                     ParDo./of/(
> >
> >                         *new* DoFn<Void, Void>() {
> >
> >
> >                           *private* *static* *final* *long*
> >         */serialVersionUID/* = 1L;
> >
> >
> >                           @RequiresStableInput
> >
> >                           @ProcessElement
> >
> >                           *public* *void* processElement() {}
> >
> >                         }));
> >
> >             pipeline.run();
> >
> >           }
> >
> >
> >     It took a while to get to checkpoint 32,767, but eventually it did,
> >     and it failed with the same error I listed above.
> >
> >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> >
> >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> >         emr-5.26.0) that uses the RequiresStableInput feature.
> >
> >         Currently it's configured to checkpoint once a minute, and after
> >         around 32000-33000 checkpoints, it fails with:
> >
> >             2020-04-15 13:15:02,920 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Triggering checkpoint 32701 @ 1586956502911 for job
> >             9953424f21e240112dd23ab4f8320b60.
> >             2020-04-15 13:15:05,762 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Completed checkpoint 32701 for job
> >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667
> ms).
> >             2020-04-15 13:16:02,919 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Triggering checkpoint 32702 @ 1586956562911 for job
> >             9953424f21e240112dd23ab4f8320b60.
> >             2020-04-15 13:16:03,147 INFO
> >              org.apache.flink.runtime.executiongraph.ExecutionGraph
> >                - <operator_name> (1/2)
> >             (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> >             FAILED.
> >             AsynchronousException{java.lang.Exception: Could not
> >             materialize checkpoint 32702 for operator <operator_name>
> >             (1/2).}
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >             at
> >
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >             at
> >
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >             at
> >
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >             at java.lang.Thread.run(Thread.java:748)
> >             Caused by: java.lang.Exception: Could not materialize
> >             checkpoint 32702 for operator <operator_name> (1/2).
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >             ... 6 more
> >             Caused by: java.util.concurrent.ExecutionException:
> >             java.lang.IllegalArgumentException
> >             at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >             at
> >
>  org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >             at
> >
>  org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >             ... 5 more
> >             Caused by: java.lang.IllegalArgumentException
> >             at
> >
>  org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >             at
> >
>  org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >             at
> >
>  org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >             at
> >
>  org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >             at
> >
>  org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >             at
> >
>  org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >             ... 7 more
> >
> >
> >         The exception comes from
> >         here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> >         In the Flink Runner code, I can see that each checkpoint will
> >         result in a new OperatorState (or KeyedState if the stream is
> >         keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> >         This seems to be the reason the pipeline will eventually die.
> >
> >         While a workaround might be to increase the time between
> >         checkpoints, it seems like any pipeline running on flink, using
> >         the RequiresStableInput is limited in the amount of time that it
> >         can run without being started from scratch.
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Eleanore Jin <el...@gmail.com>.
Hi Maxi,

I assume this will impact the Exactly Once Semantics that beam provided as
in the KafkaExactlyOnceSink, the processElement method is also annotated
with @RequiresStableInput?

Thanks a lot!
Eleanore

On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be the
> case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> >     I was able to reproduce this in a unit test:
> >
> >         @Test
> >
> >           *public* *void* test() *throws* InterruptedException,
> >         ExecutionException {
> >
> >             FlinkPipelineOptions options =
> >         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> >             options.setCheckpointingInterval(10L);
> >
> >             options.setParallelism(1);
> >
> >             options.setStreaming(*true*);
> >
> >             options.setRunner(FlinkRunner.*class*);
> >
> >             options.setFlinkMaster("[local]");
> >
> >             options.setStateBackend(*new*
> >         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> >             Pipeline pipeline = Pipeline./create/(options);
> >
> >             pipeline
> >
> >                 .apply(Create./of/((Void) *null*))
> >
> >                 .apply(
> >
> >                     ParDo./of/(
> >
> >                         *new* DoFn<Void, Void>() {
> >
> >
> >                           *private* *static* *final* *long*
> >         */serialVersionUID/* = 1L;
> >
> >
> >                           @RequiresStableInput
> >
> >                           @ProcessElement
> >
> >                           *public* *void* processElement() {}
> >
> >                         }));
> >
> >             pipeline.run();
> >
> >           }
> >
> >
> >     It took a while to get to checkpoint 32,767, but eventually it did,
> >     and it failed with the same error I listed above.
> >
> >     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> >     <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> >
> >         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> >         emr-5.26.0) that uses the RequiresStableInput feature.
> >
> >         Currently it's configured to checkpoint once a minute, and after
> >         around 32000-33000 checkpoints, it fails with:
> >
> >             2020-04-15 13:15:02,920 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Triggering checkpoint 32701 @ 1586956502911 for job
> >             9953424f21e240112dd23ab4f8320b60.
> >             2020-04-15 13:15:05,762 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Completed checkpoint 32701 for job
> >             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667
> ms).
> >             2020-04-15 13:16:02,919 INFO
> >              org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> >               - Triggering checkpoint 32702 @ 1586956562911 for job
> >             9953424f21e240112dd23ab4f8320b60.
> >             2020-04-15 13:16:03,147 INFO
> >              org.apache.flink.runtime.executiongraph.ExecutionGraph
> >                - <operator_name> (1/2)
> >             (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> >             FAILED.
> >             AsynchronousException{java.lang.Exception: Could not
> >             materialize checkpoint 32702 for operator <operator_name>
> >             (1/2).}
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> >             at
> >
>  java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> >             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >             at
> >
>  java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >             at
> >
>  java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >             at java.lang.Thread.run(Thread.java:748)
> >             Caused by: java.lang.Exception: Could not materialize
> >             checkpoint 32702 for operator <operator_name> (1/2).
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> >             ... 6 more
> >             Caused by: java.util.concurrent.ExecutionException:
> >             java.lang.IllegalArgumentException
> >             at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> >             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> >             at
> >
>  org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> >             at
> >
>  org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> >             at
> >
>  org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> >             ... 5 more
> >             Caused by: java.lang.IllegalArgumentException
> >             at
> >
>  org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> >             at
> >
>  org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> >             at
> >
>  org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> >             at
> >
>  org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> >             at
> >
>  org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> >             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> >             at
> >
>  org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> >             ... 7 more
> >
> >
> >         The exception comes from
> >         here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> >         In the Flink Runner code, I can see that each checkpoint will
> >         result in a new OperatorState (or KeyedState if the stream is
> >         keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> >         This seems to be the reason the pipeline will eventually die.
> >
> >         While a workaround might be to increase the time between
> >         checkpoints, it seems like any pipeline running on flink, using
> >         the RequiresStableInput is limited in the amount of time that it
> >         can run without being started from scratch.
> >
>

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hi Stephen,

Thanks for reporting the issue! David, good catch!

I think we have to resort to only using a single state cell for
buffering on checkpoints, instead of using a new one for every
checkpoint. I was under the assumption that, if the state cell was
cleared, it would not be checkpointed but that does not seem to be the case.

Thanks,
Max

On 21.04.20 09:29, David Morávek wrote:
> Hi Stephen,
> 
> nice catch and awesome report! ;) This definitely needs a proper fix.
> I've created a new JIRA to track the issue and will try to resolve it
> soon as this seems critical to me.
> 
> https://issues.apache.org/jira/browse/BEAM-9794
> 
> Thanks,
> D.
> 
> On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> <ma...@gmail.com>> wrote:
> 
>     I was able to reproduce this in a unit test:
> 
>         @Test
> 
>           *public* *void* test() *throws* InterruptedException,
>         ExecutionException {
> 
>             FlinkPipelineOptions options =
>         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> 
>             options.setCheckpointingInterval(10L);
> 
>             options.setParallelism(1);
> 
>             options.setStreaming(*true*);
> 
>             options.setRunner(FlinkRunner.*class*);
> 
>             options.setFlinkMaster("[local]");
> 
>             options.setStateBackend(*new*
>         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> 
>             Pipeline pipeline = Pipeline./create/(options);
> 
>             pipeline
> 
>                 .apply(Create./of/((Void) *null*))
> 
>                 .apply(
> 
>                     ParDo./of/(
> 
>                         *new* DoFn<Void, Void>() {
> 
> 
>                           *private* *static* *final* *long*
>         */serialVersionUID/* = 1L;
> 
> 
>                           @RequiresStableInput
> 
>                           @ProcessElement
> 
>                           *public* *void* processElement() {}
> 
>                         }));
> 
>             pipeline.run();
> 
>           }
> 
> 
>     It took a while to get to checkpoint 32,767, but eventually it did,
>     and it failed with the same error I listed above.
> 
>     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> 
>         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>         emr-5.26.0) that uses the RequiresStableInput feature.
> 
>         Currently it's configured to checkpoint once a minute, and after
>         around 32000-33000 checkpoints, it fails with: 
> 
>             2020-04-15 13:15:02,920 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32701 @ 1586956502911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:15:05,762 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Completed checkpoint 32701 for job
>             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
>             2020-04-15 13:16:02,919 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32702 @ 1586956562911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:16:03,147 INFO
>              org.apache.flink.runtime.executiongraph.ExecutionGraph    
>                - <operator_name> (1/2)
>             (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
>             FAILED.
>             AsynchronousException{java.lang.Exception: Could not
>             materialize checkpoint 32702 for operator <operator_name>
>             (1/2).}
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>             at
>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at
>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
>             Caused by: java.lang.Exception: Could not materialize
>             checkpoint 32702 for operator <operator_name> (1/2).
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>             ... 6 more
>             Caused by: java.util.concurrent.ExecutionException:
>             java.lang.IllegalArgumentException
>             at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>             at
>             org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>             at
>             org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>             ... 5 more
>             Caused by: java.lang.IllegalArgumentException
>             at
>             org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>             at
>             org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>             at
>             org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>             at
>             org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>             at
>             org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>             ... 7 more
> 
> 
>         The exception comes from
>         here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> 
>         In the Flink Runner code, I can see that each checkpoint will
>         result in a new OperatorState (or KeyedState if the stream is
>         keyed):
>         https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>         https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> 
>         This seems to be the reason the pipeline will eventually die.  
> 
>         While a workaround might be to increase the time between
>         checkpoints, it seems like any pipeline running on flink, using
>         the RequiresStableInput is limited in the amount of time that it
>         can run without being started from scratch.
> 

Re: Flink Runner with RequiresStableInput fails after a certain number of checkpoints

Posted by Maximilian Michels <mx...@apache.org>.
Hi Stephen,

Thanks for reporting the issue! David, good catch!

I think we have to resort to only using a single state cell for
buffering on checkpoints, instead of using a new one for every
checkpoint. I was under the assumption that, if the state cell was
cleared, it would not be checkpointed but that does not seem to be the case.

Thanks,
Max

On 21.04.20 09:29, David Morávek wrote:
> Hi Stephen,
> 
> nice catch and awesome report! ;) This definitely needs a proper fix.
> I've created a new JIRA to track the issue and will try to resolve it
> soon as this seems critical to me.
> 
> https://issues.apache.org/jira/browse/BEAM-9794
> 
> Thanks,
> D.
> 
> On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> <ma...@gmail.com>> wrote:
> 
>     I was able to reproduce this in a unit test:
> 
>         @Test
> 
>           *public* *void* test() *throws* InterruptedException,
>         ExecutionException {
> 
>             FlinkPipelineOptions options =
>         PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> 
>             options.setCheckpointingInterval(10L);
> 
>             options.setParallelism(1);
> 
>             options.setStreaming(*true*);
> 
>             options.setRunner(FlinkRunner.*class*);
> 
>             options.setFlinkMaster("[local]");
> 
>             options.setStateBackend(*new*
>         MemoryStateBackend(Integer.*/MAX_VALUE/*));
> 
>             Pipeline pipeline = Pipeline./create/(options);
> 
>             pipeline
> 
>                 .apply(Create./of/((Void) *null*))
> 
>                 .apply(
> 
>                     ParDo./of/(
> 
>                         *new* DoFn<Void, Void>() {
> 
> 
>                           *private* *static* *final* *long*
>         */serialVersionUID/* = 1L;
> 
> 
>                           @RequiresStableInput
> 
>                           @ProcessElement
> 
>                           *public* *void* processElement() {}
> 
>                         }));
> 
>             pipeline.run();
> 
>           }
> 
> 
>     It took a while to get to checkpoint 32,767, but eventually it did,
>     and it failed with the same error I listed above.
> 
>     On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
>     <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> 
>         I have a Beam Pipeline (2.14) running on Flink (1.8.0,
>         emr-5.26.0) that uses the RequiresStableInput feature.
> 
>         Currently it's configured to checkpoint once a minute, and after
>         around 32000-33000 checkpoints, it fails with: 
> 
>             2020-04-15 13:15:02,920 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32701 @ 1586956502911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:15:05,762 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Completed checkpoint 32701 for job
>             9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
>             2020-04-15 13:16:02,919 INFO
>              org.apache.flink.runtime.checkpoint.CheckpointCoordinator  
>               - Triggering checkpoint 32702 @ 1586956562911 for job
>             9953424f21e240112dd23ab4f8320b60.
>             2020-04-15 13:16:03,147 INFO
>              org.apache.flink.runtime.executiongraph.ExecutionGraph    
>                - <operator_name> (1/2)
>             (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
>             FAILED.
>             AsynchronousException{java.lang.Exception: Could not
>             materialize checkpoint 32702 for operator <operator_name>
>             (1/2).}
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>             at
>             java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>             at
>             java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>             at java.lang.Thread.run(Thread.java:748)
>             Caused by: java.lang.Exception: Could not materialize
>             checkpoint 32702 for operator <operator_name> (1/2).
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>             ... 6 more
>             Caused by: java.util.concurrent.ExecutionException:
>             java.lang.IllegalArgumentException
>             at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>             at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>             at
>             org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>             at
>             org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>             at
>             org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>             ... 5 more
>             Caused by: java.lang.IllegalArgumentException
>             at
>             org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>             at
>             org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>             at
>             org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>             at
>             org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>             at
>             org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>             at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>             at
>             org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>             ... 7 more
> 
> 
>         The exception comes from
>         here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> 
>         In the Flink Runner code, I can see that each checkpoint will
>         result in a new OperatorState (or KeyedState if the stream is
>         keyed):
>         https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>         https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> 
>         This seems to be the reason the pipeline will eventually die.  
> 
>         While a workaround might be to increase the time between
>         checkpoints, it seems like any pipeline running on flink, using
>         the RequiresStableInput is limited in the amount of time that it
>         can run without being started from scratch.
>