You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by David Morávek <dm...@apache.org> on 2020/04/21 07:29:23 UTC
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Hi Stephen,
nice catch and awesome report! ;) This definitely needs a proper fix. I've
created a new JIRA to track the issue and will try to resolve it soon as
this seems critical to me.
https://issues.apache.org/jira/browse/BEAM-9794
Thanks,
D.
On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <st...@gmail.com>
wrote:
> I was able to reproduce this in a unit test:
>
> @Test
>>
>> *public* *void* test() *throws* InterruptedException,
>> ExecutionException {
>>
>> FlinkPipelineOptions options = PipelineOptionsFactory.*as*
>> (FlinkPipelineOptions.*class*);
>>
>> options.setCheckpointingInterval(10L);
>>
>> options.setParallelism(1);
>>
>> options.setStreaming(*true*);
>>
>> options.setRunner(FlinkRunner.*class*);
>>
>> options.setFlinkMaster("[local]");
>>
>> options.setStateBackend(*new* MemoryStateBackend(Integer.*MAX_VALUE*
>> ));
>>
>> Pipeline pipeline = Pipeline.*create*(options);
>>
>> pipeline
>>
>> .apply(Create.*of*((Void) *null*))
>>
>> .apply(
>>
>> ParDo.*of*(
>>
>> *new* DoFn<Void, Void>() {
>>
>>
>> *private* *static* *final* *long* *serialVersionUID* =
>> 1L;
>>
>>
>> @RequiresStableInput
>>
>> @ProcessElement
>>
>> *public* *void* processElement() {}
>>
>> }));
>>
>> pipeline.run();
>>
>> }
>>
>
> It took a while to get to checkpoint 32,767, but eventually it did, and it
> failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel <st...@gmail.com>
> wrote:
>
>> I have a Beam Pipeline (2.14) running on Flink (1.8.0, emr-5.26.0) that
>> uses the RequiresStableInput feature.
>>
>> Currently it's configured to checkpoint once a minute, and after around
>> 32000-33000 checkpoints, it fails with:
>>
>>> 2020-04-15 13:15:02,920 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:15:05,762 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
>>> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes
>>> in 2667 ms).
>>> 2020-04-15 13:16:02,919 INFO
>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
>>> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60.
>>> 2020-04-15 13:16:03,147 INFO
>>> org.apache.flink.runtime.executiongraph.ExecutionGraph -
>>> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from
>>> RUNNING to FAILED.
>>> AsynchronousException{java.lang.Exception: Could not materialize
>>> checkpoint 32702 for operator <operator_name> (1/2).}
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>> at java.lang.Thread.run(Thread.java:748)
>>> Caused by: java.lang.Exception: Could not materialize checkpoint 32702
>>> for operator <operator_name> (1/2).
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
>>> ... 6 more
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.IllegalArgumentException
>>> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>>> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
>>> at
>>> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
>>> at
>>> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
>>> ... 5 more
>>> Caused by: java.lang.IllegalArgumentException
>>> at
>>> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
>>> at
>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
>>> at
>>> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
>>> at
>>> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>>> at
>>> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
>>> ... 7 more
>>
>>
>> The exception comes from here:
>> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>>
>> In the Flink Runner code, I can see that each checkpoint will result in a
>> new OperatorState (or KeyedState if the stream is keyed):
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
>>
>> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>>
>> This seems to be the reason the pipeline will eventually die.
>>
>> While a workaround might be to increase the time between checkpoints, it
>> seems like any pipeline running on flink, using the RequiresStableInput is
>> limited in the amount of time that it can run without being started from
>> scratch.
>>
>>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,
Thanks for the info!
Eleanore
On Tue, May 5, 2020 at 4:01 AM Maximilian Michels <mx...@apache.org> wrote:
> Hey Eleanore,
>
> The change will be part of the 2.21.0 release.
>
> -Max
>
> On 04.05.20 19:14, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the information and I saw this PR is already merged, just
> > wonder is it backported to the affected versions already
> > (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> > to wait for the 2.20.1 release?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Eleanore,
> >
> > Exactly-once is not affected but the pipeline can fail to checkpoint
> > after the maximum number of state cells have been reached. We are
> > working on a fix [1].
> >
> > Cheers,
> > Max
> >
> > [1] https://github.com/apache/beam/pull/11478
> >
> > On 22.04.20 07:19, Eleanore Jin wrote:
> > > Hi Maxi,
> > >
> > > I assume this will impact the Exactly Once Semantics that beam
> > provided
> > > as in the KafkaExactlyOnceSink, the processElement method is also
> > > annotated with @RequiresStableInput?
> > >
> > > Thanks a lot!
> > > Eleanore
> > >
> > > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org>
> > > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >
> > > Hi Stephen,
> > >
> > > Thanks for reporting the issue! David, good catch!
> > >
> > > I think we have to resort to only using a single state cell for
> > > buffering on checkpoints, instead of using a new one for every
> > > checkpoint. I was under the assumption that, if the state cell
> was
> > > cleared, it would not be checkpointed but that does not seem
> to be
> > > the case.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 21.04.20 09:29, David Morávek wrote:
> > > > Hi Stephen,
> > > >
> > > > nice catch and awesome report! ;) This definitely needs a
> > proper fix.
> > > > I've created a new JIRA to track the issue and will try to
> > resolve it
> > > > soon as this seems critical to me.
> > > >
> > > > https://issues.apache.org/jira/browse/BEAM-9794
> > > >
> > > > Thanks,
> > > > D.
> > > >
> > > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > > <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> > > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>>> wrote:
> > > >
> > > > I was able to reproduce this in a unit test:
> > > >
> > > > @Test
> > > >
> > > > *public* *void* test() *throws*
> InterruptedException,
> > > > ExecutionException {
> > > >
> > > > FlinkPipelineOptions options =
> > > >
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > > >
> > > > options.setCheckpointingInterval(10L);
> > > >
> > > > options.setParallelism(1);
> > > >
> > > > options.setStreaming(*true*);
> > > >
> > > > options.setRunner(FlinkRunner.*class*);
> > > >
> > > > options.setFlinkMaster("[local]");
> > > >
> > > > options.setStateBackend(*new*
> > > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > > >
> > > > Pipeline pipeline = Pipeline./create/(options);
> > > >
> > > > pipeline
> > > >
> > > > .apply(Create./of/((Void) *null*))
> > > >
> > > > .apply(
> > > >
> > > > ParDo./of/(
> > > >
> > > > *new* DoFn<Void, Void>() {
> > > >
> > > >
> > > > *private* *static* *final* *long*
> > > > */serialVersionUID/* = 1L;
> > > >
> > > >
> > > > @RequiresStableInput
> > > >
> > > > @ProcessElement
> > > >
> > > > *public* *void* processElement() {}
> > > >
> > > > }));
> > > >
> > > > pipeline.run();
> > > >
> > > > }
> > > >
> > > >
> > > > It took a while to get to checkpoint 32,767, but
> > eventually it
> > > did,
> > > > and it failed with the same error I listed above.
> > > >
> > > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > > <stephenpatel89@gmail.com
> > <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>>>
> > > wrote:
> > > >
> > > > I have a Beam Pipeline (2.14) running on Flink
> (1.8.0,
> > > > emr-5.26.0) that uses the RequiresStableInput
> feature.
> > > >
> > > > Currently it's configured to checkpoint once a
> > minute, and
> > > after
> > > > around 32000-33000 checkpoints, it fails with:
> > > >
> > > > 2020-04-15 13:15:02,920 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Triggering checkpoint 32701 @ 1586956502911
> > for job
> > > > 9953424f21e240112dd23ab4f8320b60.
> > > > 2020-04-15 13:15:05,762 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Completed checkpoint 32701 for job
> > > > 9953424f21e240112dd23ab4f8320b60 (795385496
> bytes in
> > > 2667 ms).
> > > > 2020-04-15 13:16:02,919 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Triggering checkpoint 32702 @ 1586956562911
> > for job
> > > > 9953424f21e240112dd23ab4f8320b60.
> > > > 2020-04-15 13:16:03,147 INFO
> > > >
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > > - <operator_name> (1/2)
> > > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > > RUNNING to
> > > > FAILED.
> > > > AsynchronousException{java.lang.Exception: Could
> not
> > > > materialize checkpoint 32702 for operator
> > <operator_name>
> > > > (1/2).}
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > > at
> > > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.Exception: Could not
> > materialize
> > > > checkpoint 32702 for operator <operator_name>
> (1/2).
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > > ... 6 more
> > > > Caused by:
> java.util.concurrent.ExecutionException:
> > > > java.lang.IllegalArgumentException
> > > > at
> > > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > > at
> > > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > > ... 5 more
> > > > Caused by: java.lang.IllegalArgumentException
> > > > at
> > > >
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > > at
> > > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > > ... 7 more
> > > >
> > > >
> > > > The exception comes from
> > > >
> > >
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > > >
> > > > In the Flink Runner code, I can see that each
> > checkpoint will
> > > > result in a new OperatorState (or KeyedState if the
> > stream is
> > > > keyed):
> > > >
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > > >
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > > >
> > > > This seems to be the reason the pipeline will
> eventually
> > > die.
> > > >
> > > > While a workaround might be to increase the time
> between
> > > > checkpoints, it seems like any pipeline running on
> > flink,
> > > using
> > > > the RequiresStableInput is limited in the amount of
> time
> > > that it
> > > > can run without being started from scratch.
> > > >
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,
Thanks for the info!
Eleanore
On Tue, May 5, 2020 at 4:01 AM Maximilian Michels <mx...@apache.org> wrote:
> Hey Eleanore,
>
> The change will be part of the 2.21.0 release.
>
> -Max
>
> On 04.05.20 19:14, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the information and I saw this PR is already merged, just
> > wonder is it backported to the affected versions already
> > (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> > to wait for the 2.20.1 release?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Eleanore,
> >
> > Exactly-once is not affected but the pipeline can fail to checkpoint
> > after the maximum number of state cells have been reached. We are
> > working on a fix [1].
> >
> > Cheers,
> > Max
> >
> > [1] https://github.com/apache/beam/pull/11478
> >
> > On 22.04.20 07:19, Eleanore Jin wrote:
> > > Hi Maxi,
> > >
> > > I assume this will impact the Exactly Once Semantics that beam
> > provided
> > > as in the KafkaExactlyOnceSink, the processElement method is also
> > > annotated with @RequiresStableInput?
> > >
> > > Thanks a lot!
> > > Eleanore
> > >
> > > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> > <mxm@apache.org <ma...@apache.org>
> > > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> > >
> > > Hi Stephen,
> > >
> > > Thanks for reporting the issue! David, good catch!
> > >
> > > I think we have to resort to only using a single state cell for
> > > buffering on checkpoints, instead of using a new one for every
> > > checkpoint. I was under the assumption that, if the state cell
> was
> > > cleared, it would not be checkpointed but that does not seem
> to be
> > > the case.
> > >
> > > Thanks,
> > > Max
> > >
> > > On 21.04.20 09:29, David Morávek wrote:
> > > > Hi Stephen,
> > > >
> > > > nice catch and awesome report! ;) This definitely needs a
> > proper fix.
> > > > I've created a new JIRA to track the issue and will try to
> > resolve it
> > > > soon as this seems critical to me.
> > > >
> > > > https://issues.apache.org/jira/browse/BEAM-9794
> > > >
> > > > Thanks,
> > > > D.
> > > >
> > > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > > <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> > > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>>> wrote:
> > > >
> > > > I was able to reproduce this in a unit test:
> > > >
> > > > @Test
> > > >
> > > > *public* *void* test() *throws*
> InterruptedException,
> > > > ExecutionException {
> > > >
> > > > FlinkPipelineOptions options =
> > > >
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > > >
> > > > options.setCheckpointingInterval(10L);
> > > >
> > > > options.setParallelism(1);
> > > >
> > > > options.setStreaming(*true*);
> > > >
> > > > options.setRunner(FlinkRunner.*class*);
> > > >
> > > > options.setFlinkMaster("[local]");
> > > >
> > > > options.setStateBackend(*new*
> > > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > > >
> > > > Pipeline pipeline = Pipeline./create/(options);
> > > >
> > > > pipeline
> > > >
> > > > .apply(Create./of/((Void) *null*))
> > > >
> > > > .apply(
> > > >
> > > > ParDo./of/(
> > > >
> > > > *new* DoFn<Void, Void>() {
> > > >
> > > >
> > > > *private* *static* *final* *long*
> > > > */serialVersionUID/* = 1L;
> > > >
> > > >
> > > > @RequiresStableInput
> > > >
> > > > @ProcessElement
> > > >
> > > > *public* *void* processElement() {}
> > > >
> > > > }));
> > > >
> > > > pipeline.run();
> > > >
> > > > }
> > > >
> > > >
> > > > It took a while to get to checkpoint 32,767, but
> > eventually it
> > > did,
> > > > and it failed with the same error I listed above.
> > > >
> > > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > > <stephenpatel89@gmail.com
> > <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>>>
> > > wrote:
> > > >
> > > > I have a Beam Pipeline (2.14) running on Flink
> (1.8.0,
> > > > emr-5.26.0) that uses the RequiresStableInput
> feature.
> > > >
> > > > Currently it's configured to checkpoint once a
> > minute, and
> > > after
> > > > around 32000-33000 checkpoints, it fails with:
> > > >
> > > > 2020-04-15 13:15:02,920 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Triggering checkpoint 32701 @ 1586956502911
> > for job
> > > > 9953424f21e240112dd23ab4f8320b60.
> > > > 2020-04-15 13:15:05,762 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Completed checkpoint 32701 for job
> > > > 9953424f21e240112dd23ab4f8320b60 (795385496
> bytes in
> > > 2667 ms).
> > > > 2020-04-15 13:16:02,919 INFO
> > > >
> > > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > > - Triggering checkpoint 32702 @ 1586956562911
> > for job
> > > > 9953424f21e240112dd23ab4f8320b60.
> > > > 2020-04-15 13:16:03,147 INFO
> > > >
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > > - <operator_name> (1/2)
> > > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > > RUNNING to
> > > > FAILED.
> > > > AsynchronousException{java.lang.Exception: Could
> not
> > > > materialize checkpoint 32702 for operator
> > <operator_name>
> > > > (1/2).}
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > > at
> > > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > > at
> > > >
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > > at java.lang.Thread.run(Thread.java:748)
> > > > Caused by: java.lang.Exception: Could not
> > materialize
> > > > checkpoint 32702 for operator <operator_name>
> (1/2).
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > > ... 6 more
> > > > Caused by:
> java.util.concurrent.ExecutionException:
> > > > java.lang.IllegalArgumentException
> > > > at
> > > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > > at
> > > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > > ... 5 more
> > > > Caused by: java.lang.IllegalArgumentException
> > > > at
> > > >
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > > at
> > > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > > at
> > > >
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > > ... 7 more
> > > >
> > > >
> > > > The exception comes from
> > > >
> > >
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > > >
> > > > In the Flink Runner code, I can see that each
> > checkpoint will
> > > > result in a new OperatorState (or KeyedState if the
> > stream is
> > > > keyed):
> > > >
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > > >
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > > >
> > > > This seems to be the reason the pipeline will
> eventually
> > > die.
> > > >
> > > > While a workaround might be to increase the time
> between
> > > > checkpoints, it seems like any pipeline running on
> > flink,
> > > using
> > > > the RequiresStableInput is limited in the amount of
> time
> > > that it
> > > > can run without being started from scratch.
> > > >
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hey Eleanore,
The change will be part of the 2.21.0 release.
-Max
On 04.05.20 19:14, Eleanore Jin wrote:
> Hi Max,
>
> Thanks for the information and I saw this PR is already merged, just
> wonder is it backported to the affected versions already
> (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> to wait for the 2.20.1 release?
>
> Thanks a lot!
> Eleanore
>
> On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam
> provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> <mxm@apache.org <ma...@apache.org>
> > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >
> > Hi Stephen,
> >
> > Thanks for reporting the issue! David, good catch!
> >
> > I think we have to resort to only using a single state cell for
> > buffering on checkpoints, instead of using a new one for every
> > checkpoint. I was under the assumption that, if the state cell was
> > cleared, it would not be checkpointed but that does not seem to be
> > the case.
> >
> > Thanks,
> > Max
> >
> > On 21.04.20 09:29, David Morávek wrote:
> > > Hi Stephen,
> > >
> > > nice catch and awesome report! ;) This definitely needs a
> proper fix.
> > > I've created a new JIRA to track the issue and will try to
> resolve it
> > > soon as this seems critical to me.
> > >
> > > https://issues.apache.org/jira/browse/BEAM-9794
> > >
> > > Thanks,
> > > D.
> > >
> > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> > > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>>> wrote:
> > >
> > > I was able to reproduce this in a unit test:
> > >
> > > @Test
> > >
> > > *public* *void* test() *throws* InterruptedException,
> > > ExecutionException {
> > >
> > > FlinkPipelineOptions options =
> > >
> PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > >
> > > options.setCheckpointingInterval(10L);
> > >
> > > options.setParallelism(1);
> > >
> > > options.setStreaming(*true*);
> > >
> > > options.setRunner(FlinkRunner.*class*);
> > >
> > > options.setFlinkMaster("[local]");
> > >
> > > options.setStateBackend(*new*
> > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > >
> > > Pipeline pipeline = Pipeline./create/(options);
> > >
> > > pipeline
> > >
> > > .apply(Create./of/((Void) *null*))
> > >
> > > .apply(
> > >
> > > ParDo./of/(
> > >
> > > *new* DoFn<Void, Void>() {
> > >
> > >
> > > *private* *static* *final* *long*
> > > */serialVersionUID/* = 1L;
> > >
> > >
> > > @RequiresStableInput
> > >
> > > @ProcessElement
> > >
> > > *public* *void* processElement() {}
> > >
> > > }));
> > >
> > > pipeline.run();
> > >
> > > }
> > >
> > >
> > > It took a while to get to checkpoint 32,767, but
> eventually it
> > did,
> > > and it failed with the same error I listed above.
> > >
> > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > <stephenpatel89@gmail.com
> <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>>>
> > wrote:
> > >
> > > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > > emr-5.26.0) that uses the RequiresStableInput feature.
> > >
> > > Currently it's configured to checkpoint once a
> minute, and
> > after
> > > around 32000-33000 checkpoints, it fails with:
> > >
> > > 2020-04-15 13:15:02,920 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32701 @ 1586956502911
> for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:15:05,762 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Completed checkpoint 32701 for job
> > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> > 2667 ms).
> > > 2020-04-15 13:16:02,919 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32702 @ 1586956562911
> for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:16:03,147 INFO
> > >
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - <operator_name> (1/2)
> > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > RUNNING to
> > > FAILED.
> > > AsynchronousException{java.lang.Exception: Could not
> > > materialize checkpoint 32702 for operator
> <operator_name>
> > > (1/2).}
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > at
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.Exception: Could not
> materialize
> > > checkpoint 32702 for operator <operator_name> (1/2).
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > ... 6 more
> > > Caused by: java.util.concurrent.ExecutionException:
> > > java.lang.IllegalArgumentException
> > > at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > ... 5 more
> > > Caused by: java.lang.IllegalArgumentException
> > > at
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > at
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > at
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > ... 7 more
> > >
> > >
> > > The exception comes from
> > >
> >
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > >
> > > In the Flink Runner code, I can see that each
> checkpoint will
> > > result in a new OperatorState (or KeyedState if the
> stream is
> > > keyed):
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > >
> > > This seems to be the reason the pipeline will eventually
> > die.
> > >
> > > While a workaround might be to increase the time between
> > > checkpoints, it seems like any pipeline running on
> flink,
> > using
> > > the RequiresStableInput is limited in the amount of time
> > that it
> > > can run without being started from scratch.
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hey Eleanore,
The change will be part of the 2.21.0 release.
-Max
On 04.05.20 19:14, Eleanore Jin wrote:
> Hi Max,
>
> Thanks for the information and I saw this PR is already merged, just
> wonder is it backported to the affected versions already
> (i.e. 2.14.0, 2.15.0, 2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have
> to wait for the 2.20.1 release?
>
> Thanks a lot!
> Eleanore
>
> On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam
> provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels
> <mxm@apache.org <ma...@apache.org>
> > <mailto:mxm@apache.org <ma...@apache.org>>> wrote:
> >
> > Hi Stephen,
> >
> > Thanks for reporting the issue! David, good catch!
> >
> > I think we have to resort to only using a single state cell for
> > buffering on checkpoints, instead of using a new one for every
> > checkpoint. I was under the assumption that, if the state cell was
> > cleared, it would not be checkpointed but that does not seem to be
> > the case.
> >
> > Thanks,
> > Max
> >
> > On 21.04.20 09:29, David Morávek wrote:
> > > Hi Stephen,
> > >
> > > nice catch and awesome report! ;) This definitely needs a
> proper fix.
> > > I've created a new JIRA to track the issue and will try to
> resolve it
> > > soon as this seems critical to me.
> > >
> > > https://issues.apache.org/jira/browse/BEAM-9794
> > >
> > > Thanks,
> > > D.
> > >
> > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>
> > > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>>> wrote:
> > >
> > > I was able to reproduce this in a unit test:
> > >
> > > @Test
> > >
> > > *public* *void* test() *throws* InterruptedException,
> > > ExecutionException {
> > >
> > > FlinkPipelineOptions options =
> > >
> PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > >
> > > options.setCheckpointingInterval(10L);
> > >
> > > options.setParallelism(1);
> > >
> > > options.setStreaming(*true*);
> > >
> > > options.setRunner(FlinkRunner.*class*);
> > >
> > > options.setFlinkMaster("[local]");
> > >
> > > options.setStateBackend(*new*
> > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > >
> > > Pipeline pipeline = Pipeline./create/(options);
> > >
> > > pipeline
> > >
> > > .apply(Create./of/((Void) *null*))
> > >
> > > .apply(
> > >
> > > ParDo./of/(
> > >
> > > *new* DoFn<Void, Void>() {
> > >
> > >
> > > *private* *static* *final* *long*
> > > */serialVersionUID/* = 1L;
> > >
> > >
> > > @RequiresStableInput
> > >
> > > @ProcessElement
> > >
> > > *public* *void* processElement() {}
> > >
> > > }));
> > >
> > > pipeline.run();
> > >
> > > }
> > >
> > >
> > > It took a while to get to checkpoint 32,767, but
> eventually it
> > did,
> > > and it failed with the same error I listed above.
> > >
> > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > <stephenpatel89@gmail.com
> <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com> <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>>>
> > wrote:
> > >
> > > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > > emr-5.26.0) that uses the RequiresStableInput feature.
> > >
> > > Currently it's configured to checkpoint once a
> minute, and
> > after
> > > around 32000-33000 checkpoints, it fails with:
> > >
> > > 2020-04-15 13:15:02,920 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32701 @ 1586956502911
> for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:15:05,762 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Completed checkpoint 32701 for job
> > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> > 2667 ms).
> > > 2020-04-15 13:16:02,919 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32702 @ 1586956562911
> for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:16:03,147 INFO
> > >
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - <operator_name> (1/2)
> > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > RUNNING to
> > > FAILED.
> > > AsynchronousException{java.lang.Exception: Could not
> > > materialize checkpoint 32702 for operator
> <operator_name>
> > > (1/2).}
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > at
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.Exception: Could not
> materialize
> > > checkpoint 32702 for operator <operator_name> (1/2).
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > ... 6 more
> > > Caused by: java.util.concurrent.ExecutionException:
> > > java.lang.IllegalArgumentException
> > > at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > ... 5 more
> > > Caused by: java.lang.IllegalArgumentException
> > > at
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > at
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > at
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > ... 7 more
> > >
> > >
> > > The exception comes from
> > >
> >
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > >
> > > In the Flink Runner code, I can see that each
> checkpoint will
> > > result in a new OperatorState (or KeyedState if the
> stream is
> > > keyed):
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > >
> > > This seems to be the reason the pipeline will eventually
> > die.
> > >
> > > While a workaround might be to increase the time between
> > > checkpoints, it seems like any pipeline running on
> flink,
> > using
> > > the RequiresStableInput is limited in the amount of time
> > that it
> > > can run without being started from scratch.
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,
Thanks for the information and I saw this PR is already merged, just wonder
is it backported to the affected versions already (i.e. 2.14.0, 2.15.0,
2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have to wait for the 2.20.1
release?
Thanks a lot!
Eleanore
On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mx...@apache.org> wrote:
> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Stephen,
> >
> > Thanks for reporting the issue! David, good catch!
> >
> > I think we have to resort to only using a single state cell for
> > buffering on checkpoints, instead of using a new one for every
> > checkpoint. I was under the assumption that, if the state cell was
> > cleared, it would not be checkpointed but that does not seem to be
> > the case.
> >
> > Thanks,
> > Max
> >
> > On 21.04.20 09:29, David Morávek wrote:
> > > Hi Stephen,
> > >
> > > nice catch and awesome report! ;) This definitely needs a proper
> fix.
> > > I've created a new JIRA to track the issue and will try to resolve
> it
> > > soon as this seems critical to me.
> > >
> > > https://issues.apache.org/jira/browse/BEAM-9794
> > >
> > > Thanks,
> > > D.
> > >
> > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>> wrote:
> > >
> > > I was able to reproduce this in a unit test:
> > >
> > > @Test
> > >
> > > *public* *void* test() *throws* InterruptedException,
> > > ExecutionException {
> > >
> > > FlinkPipelineOptions options =
> > > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > >
> > > options.setCheckpointingInterval(10L);
> > >
> > > options.setParallelism(1);
> > >
> > > options.setStreaming(*true*);
> > >
> > > options.setRunner(FlinkRunner.*class*);
> > >
> > > options.setFlinkMaster("[local]");
> > >
> > > options.setStateBackend(*new*
> > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > >
> > > Pipeline pipeline = Pipeline./create/(options);
> > >
> > > pipeline
> > >
> > > .apply(Create./of/((Void) *null*))
> > >
> > > .apply(
> > >
> > > ParDo./of/(
> > >
> > > *new* DoFn<Void, Void>() {
> > >
> > >
> > > *private* *static* *final* *long*
> > > */serialVersionUID/* = 1L;
> > >
> > >
> > > @RequiresStableInput
> > >
> > > @ProcessElement
> > >
> > > *public* *void* processElement() {}
> > >
> > > }));
> > >
> > > pipeline.run();
> > >
> > > }
> > >
> > >
> > > It took a while to get to checkpoint 32,767, but eventually it
> > did,
> > > and it failed with the same error I listed above.
> > >
> > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> > wrote:
> > >
> > > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > > emr-5.26.0) that uses the RequiresStableInput feature.
> > >
> > > Currently it's configured to checkpoint once a minute, and
> > after
> > > around 32000-33000 checkpoints, it fails with:
> > >
> > > 2020-04-15 13:15:02,920 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32701 @ 1586956502911 for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:15:05,762 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Completed checkpoint 32701 for job
> > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> > 2667 ms).
> > > 2020-04-15 13:16:02,919 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32702 @ 1586956562911 for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:16:03,147 INFO
> > >
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - <operator_name> (1/2)
> > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > RUNNING to
> > > FAILED.
> > > AsynchronousException{java.lang.Exception: Could not
> > > materialize checkpoint 32702 for operator
> <operator_name>
> > > (1/2).}
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > at
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.Exception: Could not materialize
> > > checkpoint 32702 for operator <operator_name> (1/2).
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > ... 6 more
> > > Caused by: java.util.concurrent.ExecutionException:
> > > java.lang.IllegalArgumentException
> > > at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > ... 5 more
> > > Caused by: java.lang.IllegalArgumentException
> > > at
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > at
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > at
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > ... 7 more
> > >
> > >
> > > The exception comes from
> > >
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > >
> > > In the Flink Runner code, I can see that each checkpoint
> will
> > > result in a new OperatorState (or KeyedState if the stream
> is
> > > keyed):
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > >
> > > This seems to be the reason the pipeline will eventually
> > die.
> > >
> > > While a workaround might be to increase the time between
> > > checkpoints, it seems like any pipeline running on flink,
> > using
> > > the RequiresStableInput is limited in the amount of time
> > that it
> > > can run without being started from scratch.
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,
Thanks for the information and I saw this PR is already merged, just wonder
is it backported to the affected versions already (i.e. 2.14.0, 2.15.0,
2.16.0, 2.17.0, 2.18.0, 2.19.0, 2.20.0)? Or I have to wait for the 2.20.1
release?
Thanks a lot!
Eleanore
On Wed, Apr 22, 2020 at 2:31 AM Maximilian Michels <mx...@apache.org> wrote:
> Hi Eleanore,
>
> Exactly-once is not affected but the pipeline can fail to checkpoint
> after the maximum number of state cells have been reached. We are
> working on a fix [1].
>
> Cheers,
> Max
>
> [1] https://github.com/apache/beam/pull/11478
>
> On 22.04.20 07:19, Eleanore Jin wrote:
> > Hi Maxi,
> >
> > I assume this will impact the Exactly Once Semantics that beam provided
> > as in the KafkaExactlyOnceSink, the processElement method is also
> > annotated with @RequiresStableInput?
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> > Hi Stephen,
> >
> > Thanks for reporting the issue! David, good catch!
> >
> > I think we have to resort to only using a single state cell for
> > buffering on checkpoints, instead of using a new one for every
> > checkpoint. I was under the assumption that, if the state cell was
> > cleared, it would not be checkpointed but that does not seem to be
> > the case.
> >
> > Thanks,
> > Max
> >
> > On 21.04.20 09:29, David Morávek wrote:
> > > Hi Stephen,
> > >
> > > nice catch and awesome report! ;) This definitely needs a proper
> fix.
> > > I've created a new JIRA to track the issue and will try to resolve
> it
> > > soon as this seems critical to me.
> > >
> > > https://issues.apache.org/jira/browse/BEAM-9794
> > >
> > > Thanks,
> > > D.
> > >
> > > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> > > <mailto:stephenpatel89@gmail.com
> > <ma...@gmail.com>>> wrote:
> > >
> > > I was able to reproduce this in a unit test:
> > >
> > > @Test
> > >
> > > *public* *void* test() *throws* InterruptedException,
> > > ExecutionException {
> > >
> > > FlinkPipelineOptions options =
> > > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> > >
> > > options.setCheckpointingInterval(10L);
> > >
> > > options.setParallelism(1);
> > >
> > > options.setStreaming(*true*);
> > >
> > > options.setRunner(FlinkRunner.*class*);
> > >
> > > options.setFlinkMaster("[local]");
> > >
> > > options.setStateBackend(*new*
> > > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> > >
> > > Pipeline pipeline = Pipeline./create/(options);
> > >
> > > pipeline
> > >
> > > .apply(Create./of/((Void) *null*))
> > >
> > > .apply(
> > >
> > > ParDo./of/(
> > >
> > > *new* DoFn<Void, Void>() {
> > >
> > >
> > > *private* *static* *final* *long*
> > > */serialVersionUID/* = 1L;
> > >
> > >
> > > @RequiresStableInput
> > >
> > > @ProcessElement
> > >
> > > *public* *void* processElement() {}
> > >
> > > }));
> > >
> > > pipeline.run();
> > >
> > > }
> > >
> > >
> > > It took a while to get to checkpoint 32,767, but eventually it
> > did,
> > > and it failed with the same error I listed above.
> > >
> > > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > > <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> > wrote:
> > >
> > > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > > emr-5.26.0) that uses the RequiresStableInput feature.
> > >
> > > Currently it's configured to checkpoint once a minute, and
> > after
> > > around 32000-33000 checkpoints, it fails with:
> > >
> > > 2020-04-15 13:15:02,920 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32701 @ 1586956502911 for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:15:05,762 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Completed checkpoint 32701 for job
> > > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> > 2667 ms).
> > > 2020-04-15 13:16:02,919 INFO
> > >
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > > - Triggering checkpoint 32702 @ 1586956562911 for job
> > > 9953424f21e240112dd23ab4f8320b60.
> > > 2020-04-15 13:16:03,147 INFO
> > >
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > > - <operator_name> (1/2)
> > > (f4737add01961f8b42b8eb4e791b83ba) switched from
> > RUNNING to
> > > FAILED.
> > > AsynchronousException{java.lang.Exception: Could not
> > > materialize checkpoint 32702 for operator
> <operator_name>
> > > (1/2).}
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > > at
> > >
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > > at
> > >
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > > at java.lang.Thread.run(Thread.java:748)
> > > Caused by: java.lang.Exception: Could not materialize
> > > checkpoint 32702 for operator <operator_name> (1/2).
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > > ... 6 more
> > > Caused by: java.util.concurrent.ExecutionException:
> > > java.lang.IllegalArgumentException
> > > at
> > java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > > at
> > java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > > at
> > >
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > > at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > > ... 5 more
> > > Caused by: java.lang.IllegalArgumentException
> > > at
> > >
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > > at
> > >
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > > at
> > >
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > > at
> > >
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > > at
> > java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > > at
> > >
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > > ... 7 more
> > >
> > >
> > > The exception comes from
> > >
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> > >
> > > In the Flink Runner code, I can see that each checkpoint
> will
> > > result in a new OperatorState (or KeyedState if the stream
> is
> > > keyed):
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> > >
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> > >
> > > This seems to be the reason the pipeline will eventually
> > die.
> > >
> > > While a workaround might be to increase the time between
> > > checkpoints, it seems like any pipeline running on flink,
> > using
> > > the RequiresStableInput is limited in the amount of time
> > that it
> > > can run without being started from scratch.
> > >
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hi Eleanore,
Exactly-once is not affected but the pipeline can fail to checkpoint
after the maximum number of state cells have been reached. We are
working on a fix [1].
Cheers,
Max
[1] https://github.com/apache/beam/pull/11478
On 22.04.20 07:19, Eleanore Jin wrote:
> Hi Maxi,
>
> I assume this will impact the Exactly Once Semantics that beam provided
> as in the KafkaExactlyOnceSink, the processElement method is also
> annotated with @RequiresStableInput?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be
> the case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>> wrote:
> >
> > I was able to reproduce this in a unit test:
> >
> > @Test
> >
> > *public* *void* test() *throws* InterruptedException,
> > ExecutionException {
> >
> > FlinkPipelineOptions options =
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> > options.setCheckpointingInterval(10L);
> >
> > options.setParallelism(1);
> >
> > options.setStreaming(*true*);
> >
> > options.setRunner(FlinkRunner.*class*);
> >
> > options.setFlinkMaster("[local]");
> >
> > options.setStateBackend(*new*
> > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> > Pipeline pipeline = Pipeline./create/(options);
> >
> > pipeline
> >
> > .apply(Create./of/((Void) *null*))
> >
> > .apply(
> >
> > ParDo./of/(
> >
> > *new* DoFn<Void, Void>() {
> >
> >
> > *private* *static* *final* *long*
> > */serialVersionUID/* = 1L;
> >
> >
> > @RequiresStableInput
> >
> > @ProcessElement
> >
> > *public* *void* processElement() {}
> >
> > }));
> >
> > pipeline.run();
> >
> > }
> >
> >
> > It took a while to get to checkpoint 32,767, but eventually it
> did,
> > and it failed with the same error I listed above.
> >
> > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> wrote:
> >
> > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > emr-5.26.0) that uses the RequiresStableInput feature.
> >
> > Currently it's configured to checkpoint once a minute, and
> after
> > around 32000-33000 checkpoints, it fails with:
> >
> > 2020-04-15 13:15:02,920 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32701 @ 1586956502911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:15:05,762 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Completed checkpoint 32701 for job
> > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> 2667 ms).
> > 2020-04-15 13:16:02,919 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32702 @ 1586956562911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:16:03,147 INFO
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - <operator_name> (1/2)
> > (f4737add01961f8b42b8eb4e791b83ba) switched from
> RUNNING to
> > FAILED.
> > AsynchronousException{java.lang.Exception: Could not
> > materialize checkpoint 32702 for operator <operator_name>
> > (1/2).}
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.Exception: Could not materialize
> > checkpoint 32702 for operator <operator_name> (1/2).
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException:
> > java.lang.IllegalArgumentException
> > at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > at
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > ... 5 more
> > Caused by: java.lang.IllegalArgumentException
> > at
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > at
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > at
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > ... 7 more
> >
> >
> > The exception comes from
> >
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> > In the Flink Runner code, I can see that each checkpoint will
> > result in a new OperatorState (or KeyedState if the stream is
> > keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> > This seems to be the reason the pipeline will eventually
> die.
> >
> > While a workaround might be to increase the time between
> > checkpoints, it seems like any pipeline running on flink,
> using
> > the RequiresStableInput is limited in the amount of time
> that it
> > can run without being started from scratch.
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hi Eleanore,
Exactly-once is not affected but the pipeline can fail to checkpoint
after the maximum number of state cells have been reached. We are
working on a fix [1].
Cheers,
Max
[1] https://github.com/apache/beam/pull/11478
On 22.04.20 07:19, Eleanore Jin wrote:
> Hi Maxi,
>
> I assume this will impact the Exactly Once Semantics that beam provided
> as in the KafkaExactlyOnceSink, the processElement method is also
> annotated with @RequiresStableInput?
>
> Thanks a lot!
> Eleanore
>
> On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
>
> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be
> the case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel
> <stephenpatel89@gmail.com <ma...@gmail.com>
> > <mailto:stephenpatel89@gmail.com
> <ma...@gmail.com>>> wrote:
> >
> > I was able to reproduce this in a unit test:
> >
> > @Test
> >
> > *public* *void* test() *throws* InterruptedException,
> > ExecutionException {
> >
> > FlinkPipelineOptions options =
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> > options.setCheckpointingInterval(10L);
> >
> > options.setParallelism(1);
> >
> > options.setStreaming(*true*);
> >
> > options.setRunner(FlinkRunner.*class*);
> >
> > options.setFlinkMaster("[local]");
> >
> > options.setStateBackend(*new*
> > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> > Pipeline pipeline = Pipeline./create/(options);
> >
> > pipeline
> >
> > .apply(Create./of/((Void) *null*))
> >
> > .apply(
> >
> > ParDo./of/(
> >
> > *new* DoFn<Void, Void>() {
> >
> >
> > *private* *static* *final* *long*
> > */serialVersionUID/* = 1L;
> >
> >
> > @RequiresStableInput
> >
> > @ProcessElement
> >
> > *public* *void* processElement() {}
> >
> > }));
> >
> > pipeline.run();
> >
> > }
> >
> >
> > It took a while to get to checkpoint 32,767, but eventually it
> did,
> > and it failed with the same error I listed above.
> >
> > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>
> <mailto:stephenpatel89@gmail.com <ma...@gmail.com>>>
> wrote:
> >
> > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > emr-5.26.0) that uses the RequiresStableInput feature.
> >
> > Currently it's configured to checkpoint once a minute, and
> after
> > around 32000-33000 checkpoints, it fails with:
> >
> > 2020-04-15 13:15:02,920 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32701 @ 1586956502911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:15:05,762 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Completed checkpoint 32701 for job
> > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in
> 2667 ms).
> > 2020-04-15 13:16:02,919 INFO
> >
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32702 @ 1586956562911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:16:03,147 INFO
> >
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - <operator_name> (1/2)
> > (f4737add01961f8b42b8eb4e791b83ba) switched from
> RUNNING to
> > FAILED.
> > AsynchronousException{java.lang.Exception: Could not
> > materialize checkpoint 32702 for operator <operator_name>
> > (1/2).}
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.Exception: Could not materialize
> > checkpoint 32702 for operator <operator_name> (1/2).
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException:
> > java.lang.IllegalArgumentException
> > at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > at
> java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > at
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > ... 5 more
> > Caused by: java.lang.IllegalArgumentException
> > at
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > at
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > at
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > at
> java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > ... 7 more
> >
> >
> > The exception comes from
> >
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> > In the Flink Runner code, I can see that each checkpoint will
> > result in a new OperatorState (or KeyedState if the stream is
> > keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> > This seems to be the reason the pipeline will eventually
> die.
> >
> > While a workaround might be to increase the time between
> > checkpoints, it seems like any pipeline running on flink,
> using
> > the RequiresStableInput is limited in the amount of time
> that it
> > can run without being started from scratch.
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Maxi,
I assume this will impact the Exactly Once Semantics that beam provided as
in the KafkaExactlyOnceSink, the processElement method is also annotated
with @RequiresStableInput?
Thanks a lot!
Eleanore
On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mx...@apache.org> wrote:
> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be the
> case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > I was able to reproduce this in a unit test:
> >
> > @Test
> >
> > *public* *void* test() *throws* InterruptedException,
> > ExecutionException {
> >
> > FlinkPipelineOptions options =
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> > options.setCheckpointingInterval(10L);
> >
> > options.setParallelism(1);
> >
> > options.setStreaming(*true*);
> >
> > options.setRunner(FlinkRunner.*class*);
> >
> > options.setFlinkMaster("[local]");
> >
> > options.setStateBackend(*new*
> > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> > Pipeline pipeline = Pipeline./create/(options);
> >
> > pipeline
> >
> > .apply(Create./of/((Void) *null*))
> >
> > .apply(
> >
> > ParDo./of/(
> >
> > *new* DoFn<Void, Void>() {
> >
> >
> > *private* *static* *final* *long*
> > */serialVersionUID/* = 1L;
> >
> >
> > @RequiresStableInput
> >
> > @ProcessElement
> >
> > *public* *void* processElement() {}
> >
> > }));
> >
> > pipeline.run();
> >
> > }
> >
> >
> > It took a while to get to checkpoint 32,767, but eventually it did,
> > and it failed with the same error I listed above.
> >
> > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> >
> > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > emr-5.26.0) that uses the RequiresStableInput feature.
> >
> > Currently it's configured to checkpoint once a minute, and after
> > around 32000-33000 checkpoints, it fails with:
> >
> > 2020-04-15 13:15:02,920 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32701 @ 1586956502911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:15:05,762 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Completed checkpoint 32701 for job
> > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667
> ms).
> > 2020-04-15 13:16:02,919 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32702 @ 1586956562911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:16:03,147 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - <operator_name> (1/2)
> > (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> > FAILED.
> > AsynchronousException{java.lang.Exception: Could not
> > materialize checkpoint 32702 for operator <operator_name>
> > (1/2).}
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.Exception: Could not materialize
> > checkpoint 32702 for operator <operator_name> (1/2).
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException:
> > java.lang.IllegalArgumentException
> > at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > at
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > ... 5 more
> > Caused by: java.lang.IllegalArgumentException
> > at
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > at
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > at
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > ... 7 more
> >
> >
> > The exception comes from
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> > In the Flink Runner code, I can see that each checkpoint will
> > result in a new OperatorState (or KeyedState if the stream is
> > keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> > This seems to be the reason the pipeline will eventually die.
> >
> > While a workaround might be to increase the time between
> > checkpoints, it seems like any pipeline running on flink, using
> > the RequiresStableInput is limited in the amount of time that it
> > can run without being started from scratch.
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Eleanore Jin <el...@gmail.com>.
Hi Maxi,
I assume this will impact the Exactly Once Semantics that beam provided as
in the KafkaExactlyOnceSink, the processElement method is also annotated
with @RequiresStableInput?
Thanks a lot!
Eleanore
On Tue, Apr 21, 2020 at 12:58 AM Maximilian Michels <mx...@apache.org> wrote:
> Hi Stephen,
>
> Thanks for reporting the issue! David, good catch!
>
> I think we have to resort to only using a single state cell for
> buffering on checkpoints, instead of using a new one for every
> checkpoint. I was under the assumption that, if the state cell was
> cleared, it would not be checkpointed but that does not seem to be the
> case.
>
> Thanks,
> Max
>
> On 21.04.20 09:29, David Morávek wrote:
> > Hi Stephen,
> >
> > nice catch and awesome report! ;) This definitely needs a proper fix.
> > I've created a new JIRA to track the issue and will try to resolve it
> > soon as this seems critical to me.
> >
> > https://issues.apache.org/jira/browse/BEAM-9794
> >
> > Thanks,
> > D.
> >
> > On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> > <ma...@gmail.com>> wrote:
> >
> > I was able to reproduce this in a unit test:
> >
> > @Test
> >
> > *public* *void* test() *throws* InterruptedException,
> > ExecutionException {
> >
> > FlinkPipelineOptions options =
> > PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
> >
> > options.setCheckpointingInterval(10L);
> >
> > options.setParallelism(1);
> >
> > options.setStreaming(*true*);
> >
> > options.setRunner(FlinkRunner.*class*);
> >
> > options.setFlinkMaster("[local]");
> >
> > options.setStateBackend(*new*
> > MemoryStateBackend(Integer.*/MAX_VALUE/*));
> >
> > Pipeline pipeline = Pipeline./create/(options);
> >
> > pipeline
> >
> > .apply(Create./of/((Void) *null*))
> >
> > .apply(
> >
> > ParDo./of/(
> >
> > *new* DoFn<Void, Void>() {
> >
> >
> > *private* *static* *final* *long*
> > */serialVersionUID/* = 1L;
> >
> >
> > @RequiresStableInput
> >
> > @ProcessElement
> >
> > *public* *void* processElement() {}
> >
> > }));
> >
> > pipeline.run();
> >
> > }
> >
> >
> > It took a while to get to checkpoint 32,767, but eventually it did,
> > and it failed with the same error I listed above.
> >
> > On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> > <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
> >
> > I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> > emr-5.26.0) that uses the RequiresStableInput feature.
> >
> > Currently it's configured to checkpoint once a minute, and after
> > around 32000-33000 checkpoints, it fails with:
> >
> > 2020-04-15 13:15:02,920 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32701 @ 1586956502911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:15:05,762 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Completed checkpoint 32701 for job
> > 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667
> ms).
> > 2020-04-15 13:16:02,919 INFO
> > org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> > - Triggering checkpoint 32702 @ 1586956562911 for job
> > 9953424f21e240112dd23ab4f8320b60.
> > 2020-04-15 13:16:03,147 INFO
> > org.apache.flink.runtime.executiongraph.ExecutionGraph
> > - <operator_name> (1/2)
> > (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> > FAILED.
> > AsynchronousException{java.lang.Exception: Could not
> > materialize checkpoint 32702 for operator <operator_name>
> > (1/2).}
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: java.lang.Exception: Could not materialize
> > checkpoint 32702 for operator <operator_name> (1/2).
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> > ... 6 more
> > Caused by: java.util.concurrent.ExecutionException:
> > java.lang.IllegalArgumentException
> > at
> java.util.concurrent.FutureTask.report(FutureTask.java:122)
> > at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> > at
> >
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> > at
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> > ... 5 more
> > Caused by: java.lang.IllegalArgumentException
> > at
> >
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> > at
> >
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> > at
> >
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> > at
> >
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> > at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> > at
> >
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> > ... 7 more
> >
> >
> > The exception comes from
> > here:
> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
> >
> > In the Flink Runner code, I can see that each checkpoint will
> > result in a new OperatorState (or KeyedState if the stream is
> > keyed):
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> >
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
> >
> > This seems to be the reason the pipeline will eventually die.
> >
> > While a workaround might be to increase the time between
> > checkpoints, it seems like any pipeline running on flink, using
> > the RequiresStableInput is limited in the amount of time that it
> > can run without being started from scratch.
> >
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hi Stephen,
Thanks for reporting the issue! David, good catch!
I think we have to resort to only using a single state cell for
buffering on checkpoints, instead of using a new one for every
checkpoint. I was under the assumption that, if the state cell was
cleared, it would not be checkpointed but that does not seem to be the case.
Thanks,
Max
On 21.04.20 09:29, David Morávek wrote:
> Hi Stephen,
>
> nice catch and awesome report! ;) This definitely needs a proper fix.
> I've created a new JIRA to track the issue and will try to resolve it
> soon as this seems critical to me.
>
> https://issues.apache.org/jira/browse/BEAM-9794
>
> Thanks,
> D.
>
> On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> <ma...@gmail.com>> wrote:
>
> I was able to reproduce this in a unit test:
>
> @Test
>
> *public* *void* test() *throws* InterruptedException,
> ExecutionException {
>
> FlinkPipelineOptions options =
> PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>
> options.setCheckpointingInterval(10L);
>
> options.setParallelism(1);
>
> options.setStreaming(*true*);
>
> options.setRunner(FlinkRunner.*class*);
>
> options.setFlinkMaster("[local]");
>
> options.setStateBackend(*new*
> MemoryStateBackend(Integer.*/MAX_VALUE/*));
>
> Pipeline pipeline = Pipeline./create/(options);
>
> pipeline
>
> .apply(Create./of/((Void) *null*))
>
> .apply(
>
> ParDo./of/(
>
> *new* DoFn<Void, Void>() {
>
>
> *private* *static* *final* *long*
> */serialVersionUID/* = 1L;
>
>
> @RequiresStableInput
>
> @ProcessElement
>
> *public* *void* processElement() {}
>
> }));
>
> pipeline.run();
>
> }
>
>
> It took a while to get to checkpoint 32,767, but eventually it did,
> and it failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
>
> I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> emr-5.26.0) that uses the RequiresStableInput feature.
>
> Currently it's configured to checkpoint once a minute, and after
> around 32000-33000 checkpoints, it fails with:
>
> 2020-04-15 13:15:02,920 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 32701 @ 1586956502911 for job
> 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:15:05,762 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 32701 for job
> 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
> 2020-04-15 13:16:02,919 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 32702 @ 1586956562911 for job
> 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:16:03,147 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - <operator_name> (1/2)
> (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> FAILED.
> AsynchronousException{java.lang.Exception: Could not
> materialize checkpoint 32702 for operator <operator_name>
> (1/2).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize
> checkpoint 32702 for operator <operator_name> (1/2).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
>
> The exception comes from
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>
> In the Flink Runner code, I can see that each checkpoint will
> result in a new OperatorState (or KeyedState if the stream is
> keyed):
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>
> This seems to be the reason the pipeline will eventually die.
>
> While a workaround might be to increase the time between
> checkpoints, it seems like any pipeline running on flink, using
> the RequiresStableInput is limited in the amount of time that it
> can run without being started from scratch.
>
Re: Flink Runner with RequiresStableInput fails after a certain
number of checkpoints
Posted by Maximilian Michels <mx...@apache.org>.
Hi Stephen,
Thanks for reporting the issue! David, good catch!
I think we have to resort to only using a single state cell for
buffering on checkpoints, instead of using a new one for every
checkpoint. I was under the assumption that, if the state cell was
cleared, it would not be checkpointed but that does not seem to be the case.
Thanks,
Max
On 21.04.20 09:29, David Morávek wrote:
> Hi Stephen,
>
> nice catch and awesome report! ;) This definitely needs a proper fix.
> I've created a new JIRA to track the issue and will try to resolve it
> soon as this seems critical to me.
>
> https://issues.apache.org/jira/browse/BEAM-9794
>
> Thanks,
> D.
>
> On Mon, Apr 20, 2020 at 10:41 PM Stephen Patel <stephenpatel89@gmail.com
> <ma...@gmail.com>> wrote:
>
> I was able to reproduce this in a unit test:
>
> @Test
>
> *public* *void* test() *throws* InterruptedException,
> ExecutionException {
>
> FlinkPipelineOptions options =
> PipelineOptionsFactory./as/(FlinkPipelineOptions.*class*);
>
> options.setCheckpointingInterval(10L);
>
> options.setParallelism(1);
>
> options.setStreaming(*true*);
>
> options.setRunner(FlinkRunner.*class*);
>
> options.setFlinkMaster("[local]");
>
> options.setStateBackend(*new*
> MemoryStateBackend(Integer.*/MAX_VALUE/*));
>
> Pipeline pipeline = Pipeline./create/(options);
>
> pipeline
>
> .apply(Create./of/((Void) *null*))
>
> .apply(
>
> ParDo./of/(
>
> *new* DoFn<Void, Void>() {
>
>
> *private* *static* *final* *long*
> */serialVersionUID/* = 1L;
>
>
> @RequiresStableInput
>
> @ProcessElement
>
> *public* *void* processElement() {}
>
> }));
>
> pipeline.run();
>
> }
>
>
> It took a while to get to checkpoint 32,767, but eventually it did,
> and it failed with the same error I listed above.
>
> On Thu, Apr 16, 2020 at 11:26 AM Stephen Patel
> <stephenpatel89@gmail.com <ma...@gmail.com>> wrote:
>
> I have a Beam Pipeline (2.14) running on Flink (1.8.0,
> emr-5.26.0) that uses the RequiresStableInput feature.
>
> Currently it's configured to checkpoint once a minute, and after
> around 32000-33000 checkpoints, it fails with:
>
> 2020-04-15 13:15:02,920 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 32701 @ 1586956502911 for job
> 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:15:05,762 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Completed checkpoint 32701 for job
> 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms).
> 2020-04-15 13:16:02,919 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator
> - Triggering checkpoint 32702 @ 1586956562911 for job
> 9953424f21e240112dd23ab4f8320b60.
> 2020-04-15 13:16:03,147 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph
> - <operator_name> (1/2)
> (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to
> FAILED.
> AsynchronousException{java.lang.Exception: Could not
> materialize checkpoint 32702 for operator <operator_name>
> (1/2).}
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.Exception: Could not materialize
> checkpoint 32702 for operator <operator_name> (1/2).
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942)
> ... 6 more
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.IllegalArgumentException
> at java.util.concurrent.FutureTask.report(FutureTask.java:122)
> at java.util.concurrent.FutureTask.get(FutureTask.java:192)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394)
> at
> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853)
> ... 5 more
> Caused by: java.lang.IllegalArgumentException
> at
> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
> at
> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391)
> ... 7 more
>
>
> The exception comes from
> here: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68
>
> In the Flink Runner code, I can see that each checkpoint will
> result in a new OperatorState (or KeyedState if the stream is
> keyed):
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L103
> https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L136-L143
>
> This seems to be the reason the pipeline will eventually die.
>
> While a workaround might be to increase the time between
> checkpoints, it seems like any pipeline running on flink, using
> the RequiresStableInput is limited in the amount of time that it
> can run without being started from scratch.
>