You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Jan Lukavský <je...@seznam.cz> on 2022/02/04 10:56:00 UTC

Re: Beam State with the Flink Runner when things go wrong

+dev <ma...@beam.apache.org>

Hi Cristian,

the savepointPath should not be ignored. We need to verify if local 
environment supports savepoints (I suppose it does) and in that case we 
should use it. In the case it does not we should throw exception as 
silent ignoring of the savepoint is misleading.

Would you file a JIRA? Or possibly create a PR to fix this?

Best,

  Jan

On 2/3/22 07:12, Cristian Constantinescu wrote:
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when 
> the flinkMaster argument is not set, the savepointPath is not used at 
> all. [1]
>
> In fact the only time the savepointPath argument is used within all of 
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the 
> embedded Flink cluster that Beam starts, which from the looks of it, 
> does NOT use the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can 
> update the documentation to explicitly state that savepoint resuming 
> is not supported locally.
>
> I will do more testing around this with a real Flink cluster and see 
> if the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1] 
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2] 
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu 
> <ze...@gmail.com> wrote:
>
>     Hey Pavel,
>
>     Thanks for the quick reply. Pardon me as I cannot copy/paste
>     straight from the IDE, copying by hand:
>
>         KafkaIO.<Pojo>read()
>         .withBootStrapServer("address")
>         .withTopic("topic")
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>
>         .withConsumerConfigUpdates(map)
>         .withReadCommitted()
>         .commitOffsetInFinalize()
>
>         .withProcessingTime();
>
>
>     The config map is:
>     enable.auto.commit -> false
>     group.id <http://group.id> -> some group
>     auto.offset.reset -> earliest
>     specific.avro.reader -> false
>
>
>     On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin
>     <p....@gmail.com> wrote:
>
>         Hello Christian,
>
>         Thanks for posting here the detailed scenario of your
>         experiments. I think it may be important to share your KafkaIO
>         configuration here too. For example, are you setting this
>         config anyhow?
>         https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>
>         Best Regards,
>         Pavel Solomin
>
>         Tel: +351 962 950 692| Skype: pavel_solomin | Linkedin
>         <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
>
>         On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu
>         <ze...@gmail.com> wrote:
>
>             Hi everyone,
>
>             I'm trying to figure out how pipeline state works with
>             Beam running on Flink Classic. Would appreciate some help
>             with the below.
>
>             My understanding is that on recovery (whether from a
>             checkpoint or savepoint), Flink recreates the operators (I
>             guess those are DoFns in Beam) with whatever state they
>             had when the pipeline crashed. For example the Kafka
>             operator might contain the latest *safe* offset to restart
>             from. But I'm not seeing this when I introduce exceptions
>             in the pipeline.
>
>             My pipeline is as follows:
>             1. Read a Kafka topic from start
>             2. Have a DoFn that stores all incoming messages in a BagState
>             3. Above DoFn triggers a timer set in such a way that it
>             triggers after there are a few checkpoints created and
>             kept because of --externalizeCheckpointsEnabled = true.
>             This timer handler then outputs the elements to the next
>             operator, in this case KafkaIo.Write.
>             4. Before the timer in #3 is executed manually trigger an
>             exception (listen to another kafka topic, and throw any
>             time a new message comes in)
>
>             What I observe:
>             1. In #4 above Flink tries to process the exception twice
>             then stops the pipeline (because numberOfExecutionRetries =2 )
>             2. After the pipeline is stopped, I see the checkpoints
>             are kept in the configured directory
>             3. If I restart the pipeline (with --savepointPath = <path
>             to latest checkpoint from first run>):
>             3a. No messages are read from kafka, because the Kafka
>             reader reached the end of the topic during the first run
>             3b. StartBundles are not executed for my DoFn. Indicating
>             that the DoFn isn't even started
>             3c. The timer in #3 is never executed, hence there is data
>             loss as the elements I had in my DoFn state are never
>             processed
>             4. If I manually reset the offset to the start of the
>             topic and restart the pipeline (with --savepointPath =
>             <path to latest checkpoint from first run>):
>             4a. StartBundle methods are called
>             4b. In ProcessElement, the BagState is empty on the first
>             received message. If I'm restoring from a
>             checkpoint/savepoint, I would expect this state to be filled.
>
>             Is this correct behaviour? Am I doing something wrong?
>
>             Thanks,
>             Cristian
>
>             Other quirks I found:
>             a. If KafkaIO.Read is configured to read from the latest
>             offset, and there is an exception thrown in the pipeline
>             before the first checkpoint happens (let's say on the
>             first message that comes in), when Flink retries KafkaIO
>             reads from the latest offset again. That means that the
>             message that caused the exception is not reprocessed. On
>             the other hand, if the exception is thrown after the first
>             checkpoint, that message will be tried twice (because
>             numberOfExecutionRetries =2 ), and then the pipeline will
>             exit. I think this is working as designed but it feels a
>             little weird that the behaviour is different depending if
>             there's a checkpoint or not.
>
>             b. When KafkaIO.Write is configured with .withEOS(number,
>             "group"), and there is an exception thrown in the
>             pipeline, the Flink job doesn't exit. I think there is a
>             kafka producer in KafkaExactlyOnceSink that is not closed
>             correctly.
>

Re: Beam State with the Flink Runner when things go wrong

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hi everyone,

I did a little more testing.

Passing the Flink "-s" flag to Flink CLI to submit the job correctly
restores it from the given checkpoint:
flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2 ... other flink parameters as specified in [1]

Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to
submit the job does NOT correctly restore it from the given checkpoint. For
the same reason as before, "flink_master" is not being set.
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --savepoint_path="checkpoint path" etc...

However, setting "flink_master" doesn't seem to work with the Flink CLI,
getting "The RemoteEnvironment cannot be instantiated when running in a
pre-defined context (such as Command Line Client, Scala Shell or
TestEnvironment)."
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --flink_master=localhost:portnumber
--savepoint_path="checkpoint path" etc...

As a workaround for now, I will use the -s parameter with the Flink CLI.
I'm surprised that this just came up, I'd think that restoring from
savepoint/checkpoint with Flink and Beam is a pretty common usage scenario.
I guess people would use the Flink parameters when possible. In my case, I
prefer to find the latest checkpoint/savepoint in code and use
options.setSavepointPath(path) programmatically before
Pipeline.create(options) call is made.

Cheers,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/


On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in my
> case, time consuming.
>
> I will gladly create the JIRA and PR. I do have some other things I want
> to contribute to Beam. Will get to them soon.
>
> On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> +dev <de...@beam.apache.org>
>>
>> Hi Cristian,
>>
>> the savepointPath should not be ignored. We need to verify if local
>> environment supports savepoints (I suppose it does) and in that case we
>> should use it. In the case it does not we should throw exception as silent
>> ignoring of the savepoint is misleading.
>>
>> Would you file a JIRA? Or possibly create a PR to fix this?
>>
>> Best,
>>
>>  Jan
>> On 2/3/22 07:12, Cristian Constantinescu wrote:
>>
>> Hi everyone,
>>
>> I've done some digging within the Beam source code. It looks like when
>> the flinkMaster argument is not set, the savepointPath is not used at all.
>> [1]
>>
>> In fact the only time the savepointPath argument is used within all of
>> Beam's source code is on lines 183 and 186 of the same file. [2]
>>
>> Of course, I did all my testing locally on my dev box with the embedded
>> Flink cluster that Beam starts, which from the looks of it, does NOT use
>> the savepointPath at all.
>>
>> If someone familiar with the code can confirm this finding, I can update
>> the documentation to explicitly state that savepoint resuming is not
>> supported locally.
>>
>> I will do more testing around this with a real Flink cluster and see if
>> the behavior is different than the one described in my first email.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
>> [2]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>>
>> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <ze...@gmail.com>
>> wrote:
>>
>>> Hey Pavel,
>>>
>>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>>> from the IDE, copying by hand:
>>>
>>> KafkaIO.<Pojo>read()
>>> .withBootStrapServer("address")
>>> .withTopic("topic")
>>>
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>>
>>> .withConsumerConfigUpdates(map)
>>> .withReadCommitted()
>>> .commitOffsetInFinalize()
>>>
>>> .withProcessingTime();
>>>
>>>
>>> The config map is:
>>> enable.auto.commit -> false
>>> group.id -> some group
>>> auto.offset.reset -> earliest
>>> specific.avro.reader -> false
>>>
>>>
>>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p....@gmail.com>
>>> wrote:
>>>
>>>> Hello Christian,
>>>>
>>>> Thanks for posting here the detailed scenario of your experiments. I
>>>> think it may be important to share your KafkaIO configuration here too. For
>>>> example, are you setting this config anyhow?
>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <ze...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>>> Flink Classic. Would appreciate some help with the below.
>>>>>
>>>>> My understanding is that on recovery (whether from a checkpoint or
>>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>>> with whatever state they had when the pipeline crashed. For example the
>>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>>
>>>>> My pipeline is as follows:
>>>>> 1. Read a Kafka topic from start
>>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>>> 3. Above DoFn triggers a timer set in such a way that it triggers
>>>>> after there are a few checkpoints created and kept because of
>>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>>> elements to the next operator, in this case KafkaIo.Write.
>>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>>
>>>>> What I observe:
>>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in
>>>>> the configured directory
>>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>>> checkpoint from first run>):
>>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>>> the end of the topic during the first run
>>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the
>>>>> DoFn isn't even started
>>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>>> elements I had in my DoFn state are never processed
>>>>> 4. If I manually reset the offset to the start of the topic and
>>>>> restart the pipeline (with --savepointPath = <path to latest checkpoint
>>>>> from first run>):
>>>>> 4a. StartBundle methods are called
>>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>>> state to be filled.
>>>>>
>>>>> Is this correct behaviour? Am I doing something wrong?
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>> Other quirks I found:
>>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>>> happens (let's say on the first message that comes in), when Flink retries
>>>>> KafkaIO reads from the latest offset again. That means that the message
>>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>>> exception is thrown after the first checkpoint, that message will be tried
>>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>>> exit. I think this is working as designed but it feels a little weird
>>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>>
>>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"),
>>>>> and there is an exception thrown in the pipeline, the Flink job doesn't
>>>>> exit. I think there is a kafka producer in KafkaExactlyOnceSink that is not
>>>>> closed correctly.
>>>>>
>>>>

Re: Beam State with the Flink Runner when things go wrong

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hi everyone,

I did a little more testing.

Passing the Flink "-s" flag to Flink CLI to submit the job correctly
restores it from the given checkpoint:
flink run --detached -s "checkpoint path" -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2 ... other flink parameters as specified in [1]

Passing the beam Flink pipeline option "savepoint_path" to the Flink CLI to
submit the job does NOT correctly restore it from the given checkpoint. For
the same reason as before, "flink_master" is not being set.
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --savepoint_path="checkpoint path" etc...

However, setting "flink_master" doesn't seem to work with the Flink CLI,
getting "The RemoteEnvironment cannot be instantiated when running in a
pre-defined context (such as Command Line Client, Scala Shell or
TestEnvironment)."
flink run --detached -c Appclass fat.jar --streaming
--numberOfExecutionRetries=2  --flink_master=localhost:portnumber
--savepoint_path="checkpoint path" etc...

As a workaround for now, I will use the -s parameter with the Flink CLI.
I'm surprised that this just came up, I'd think that restoring from
savepoint/checkpoint with Flink and Beam is a pretty common usage scenario.
I guess people would use the Flink parameters when possible. In my case, I
prefer to find the latest checkpoint/savepoint in code and use
options.setSavepointPath(path) programmatically before
Pipeline.create(options) call is made.

Cheers,
Cristian

[1] https://beam.apache.org/documentation/runners/flink/


On Fri, Feb 4, 2022 at 10:17 AM Cristian Constantinescu <ze...@gmail.com>
wrote:

> Hey Jan,
>
> I agree that silently ignoring the parameter is misleading and, in my
> case, time consuming.
>
> I will gladly create the JIRA and PR. I do have some other things I want
> to contribute to Beam. Will get to them soon.
>
> On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> +dev <de...@beam.apache.org>
>>
>> Hi Cristian,
>>
>> the savepointPath should not be ignored. We need to verify if local
>> environment supports savepoints (I suppose it does) and in that case we
>> should use it. In the case it does not we should throw exception as silent
>> ignoring of the savepoint is misleading.
>>
>> Would you file a JIRA? Or possibly create a PR to fix this?
>>
>> Best,
>>
>>  Jan
>> On 2/3/22 07:12, Cristian Constantinescu wrote:
>>
>> Hi everyone,
>>
>> I've done some digging within the Beam source code. It looks like when
>> the flinkMaster argument is not set, the savepointPath is not used at all.
>> [1]
>>
>> In fact the only time the savepointPath argument is used within all of
>> Beam's source code is on lines 183 and 186 of the same file. [2]
>>
>> Of course, I did all my testing locally on my dev box with the embedded
>> Flink cluster that Beam starts, which from the looks of it, does NOT use
>> the savepointPath at all.
>>
>> If someone familiar with the code can confirm this finding, I can update
>> the documentation to explicitly state that savepoint resuming is not
>> supported locally.
>>
>> I will do more testing around this with a real Flink cluster and see if
>> the behavior is different than the one described in my first email.
>>
>> Thanks,
>> Cristian
>>
>> [1]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
>> [2]
>> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>>
>> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <ze...@gmail.com>
>> wrote:
>>
>>> Hey Pavel,
>>>
>>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>>> from the IDE, copying by hand:
>>>
>>> KafkaIO.<Pojo>read()
>>> .withBootStrapServer("address")
>>> .withTopic("topic")
>>>
>>> .withKeyDeserializer(StringDeserializer.class)
>>>
>>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>>
>>> .withConsumerConfigUpdates(map)
>>> .withReadCommitted()
>>> .commitOffsetInFinalize()
>>>
>>> .withProcessingTime();
>>>
>>>
>>> The config map is:
>>> enable.auto.commit -> false
>>> group.id -> some group
>>> auto.offset.reset -> earliest
>>> specific.avro.reader -> false
>>>
>>>
>>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p....@gmail.com>
>>> wrote:
>>>
>>>> Hello Christian,
>>>>
>>>> Thanks for posting here the detailed scenario of your experiments. I
>>>> think it may be important to share your KafkaIO configuration here too. For
>>>> example, are you setting this config anyhow?
>>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <ze...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi everyone,
>>>>>
>>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>>> Flink Classic. Would appreciate some help with the below.
>>>>>
>>>>> My understanding is that on recovery (whether from a checkpoint or
>>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>>> with whatever state they had when the pipeline crashed. For example the
>>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>>
>>>>> My pipeline is as follows:
>>>>> 1. Read a Kafka topic from start
>>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>>> 3. Above DoFn triggers a timer set in such a way that it triggers
>>>>> after there are a few checkpoints created and kept because of
>>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>>> elements to the next operator, in this case KafkaIo.Write.
>>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>>
>>>>> What I observe:
>>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in
>>>>> the configured directory
>>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>>> checkpoint from first run>):
>>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>>> the end of the topic during the first run
>>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the
>>>>> DoFn isn't even started
>>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>>> elements I had in my DoFn state are never processed
>>>>> 4. If I manually reset the offset to the start of the topic and
>>>>> restart the pipeline (with --savepointPath = <path to latest checkpoint
>>>>> from first run>):
>>>>> 4a. StartBundle methods are called
>>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>>> state to be filled.
>>>>>
>>>>> Is this correct behaviour? Am I doing something wrong?
>>>>>
>>>>> Thanks,
>>>>> Cristian
>>>>>
>>>>> Other quirks I found:
>>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>>> happens (let's say on the first message that comes in), when Flink retries
>>>>> KafkaIO reads from the latest offset again. That means that the message
>>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>>> exception is thrown after the first checkpoint, that message will be tried
>>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>>> exit. I think this is working as designed but it feels a little weird
>>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>>
>>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"),
>>>>> and there is an exception thrown in the pipeline, the Flink job doesn't
>>>>> exit. I think there is a kafka producer in KafkaExactlyOnceSink that is not
>>>>> closed correctly.
>>>>>
>>>>

Re: Beam State with the Flink Runner when things go wrong

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hey Jan,

I agree that silently ignoring the parameter is misleading and, in my case,
time consuming.

I will gladly create the JIRA and PR. I do have some other things I want to
contribute to Beam. Will get to them soon.

On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:

> +dev <de...@beam.apache.org>
>
> Hi Cristian,
>
> the savepointPath should not be ignored. We need to verify if local
> environment supports savepoints (I suppose it does) and in that case we
> should use it. In the case it does not we should throw exception as silent
> ignoring of the savepoint is misleading.
>
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
>  Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the savepointPath is not used at all. [1]
>
> In fact the only time the savepointPath argument is used within all of
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the embedded
> Flink cluster that Beam starts, which from the looks of it, does NOT use
> the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can update
> the documentation to explicitly state that savepoint resuming is not
> supported locally.
>
> I will do more testing around this with a real Flink cluster and see if
> the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <ze...@gmail.com>
> wrote:
>
>> Hey Pavel,
>>
>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>> from the IDE, copying by hand:
>>
>> KafkaIO.<Pojo>read()
>> .withBootStrapServer("address")
>> .withTopic("topic")
>>
>> .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>
>> .withConsumerConfigUpdates(map)
>> .withReadCommitted()
>> .commitOffsetInFinalize()
>>
>> .withProcessingTime();
>>
>>
>> The config map is:
>> enable.auto.commit -> false
>> group.id -> some group
>> auto.offset.reset -> earliest
>> specific.avro.reader -> false
>>
>>
>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p....@gmail.com>
>> wrote:
>>
>>> Hello Christian,
>>>
>>> Thanks for posting here the detailed scenario of your experiments. I
>>> think it may be important to share your KafkaIO configuration here too. For
>>> example, are you setting this config anyhow?
>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <ze...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>> Flink Classic. Would appreciate some help with the below.
>>>>
>>>> My understanding is that on recovery (whether from a checkpoint or
>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>> with whatever state they had when the pipeline crashed. For example the
>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>
>>>> My pipeline is as follows:
>>>> 1. Read a Kafka topic from start
>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>> 3. Above DoFn triggers a timer set in such a way that it triggers after
>>>> there are a few checkpoints created and kept because of
>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>> elements to the next operator, in this case KafkaIo.Write.
>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>
>>>> What I observe:
>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in the
>>>> configured directory
>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>> checkpoint from first run>):
>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>> the end of the topic during the first run
>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
>>>> isn't even started
>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>> elements I had in my DoFn state are never processed
>>>> 4. If I manually reset the offset to the start of the topic and restart
>>>> the pipeline (with --savepointPath = <path to latest checkpoint from first
>>>> run>):
>>>> 4a. StartBundle methods are called
>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>> state to be filled.
>>>>
>>>> Is this correct behaviour? Am I doing something wrong?
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>> Other quirks I found:
>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>> happens (let's say on the first message that comes in), when Flink retries
>>>> KafkaIO reads from the latest offset again. That means that the message
>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>> exception is thrown after the first checkpoint, that message will be tried
>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>> exit. I think this is working as designed but it feels a little weird
>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>
>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
>>>> there is an exception thrown in the pipeline, the Flink job doesn't exit. I
>>>> think there is a kafka producer in KafkaExactlyOnceSink that is not closed
>>>> correctly.
>>>>
>>>

Re: Beam State with the Flink Runner when things go wrong

Posted by Cristian Constantinescu <ze...@gmail.com>.
Hey Jan,

I agree that silently ignoring the parameter is misleading and, in my case,
time consuming.

I will gladly create the JIRA and PR. I do have some other things I want to
contribute to Beam. Will get to them soon.

On Fri, Feb 4, 2022 at 5:56 AM Jan Lukavský <je...@seznam.cz> wrote:

> +dev <de...@beam.apache.org>
>
> Hi Cristian,
>
> the savepointPath should not be ignored. We need to verify if local
> environment supports savepoints (I suppose it does) and in that case we
> should use it. In the case it does not we should throw exception as silent
> ignoring of the savepoint is misleading.
>
> Would you file a JIRA? Or possibly create a PR to fix this?
>
> Best,
>
>  Jan
> On 2/3/22 07:12, Cristian Constantinescu wrote:
>
> Hi everyone,
>
> I've done some digging within the Beam source code. It looks like when the
> flinkMaster argument is not set, the savepointPath is not used at all. [1]
>
> In fact the only time the savepointPath argument is used within all of
> Beam's source code is on lines 183 and 186 of the same file. [2]
>
> Of course, I did all my testing locally on my dev box with the embedded
> Flink cluster that Beam starts, which from the looks of it, does NOT use
> the savepointPath at all.
>
> If someone familiar with the code can confirm this finding, I can update
> the documentation to explicitly state that savepoint resuming is not
> supported locally.
>
> I will do more testing around this with a real Flink cluster and see if
> the behavior is different than the one described in my first email.
>
> Thanks,
> Cristian
>
> [1]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L174
> [2]
> https://github.com/apache/beam/blob/51e0e4ef82feaae251b37a0288ad7a04ee603086/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkExecutionEnvironments.java#L183
>
> On Wed, Feb 2, 2022 at 3:01 PM Cristian Constantinescu <ze...@gmail.com>
> wrote:
>
>> Hey Pavel,
>>
>> Thanks for the quick reply. Pardon me as I cannot copy/paste straight
>> from the IDE, copying by hand:
>>
>> KafkaIO.<Pojo>read()
>> .withBootStrapServer("address")
>> .withTopic("topic")
>>
>> .withKeyDeserializer(StringDeserializer.class)
>>
>> .withValueDeserializer(ConfluentSchemaRegistryDesProvider.of(...))
>>
>> .withConsumerConfigUpdates(map)
>> .withReadCommitted()
>> .commitOffsetInFinalize()
>>
>> .withProcessingTime();
>>
>>
>> The config map is:
>> enable.auto.commit -> false
>> group.id -> some group
>> auto.offset.reset -> earliest
>> specific.avro.reader -> false
>>
>>
>> On Wed, Feb 2, 2022 at 2:44 PM Pavel Solomin <p....@gmail.com>
>> wrote:
>>
>>> Hello Christian,
>>>
>>> Thanks for posting here the detailed scenario of your experiments. I
>>> think it may be important to share your KafkaIO configuration here too. For
>>> example, are you setting this config anyhow?
>>> https://beam.apache.org/releases/javadoc/2.35.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 2 Feb 2022 at 19:20, Cristian Constantinescu <ze...@gmail.com>
>>> wrote:
>>>
>>>> Hi everyone,
>>>>
>>>> I'm trying to figure out how pipeline state works with Beam running on
>>>> Flink Classic. Would appreciate some help with the below.
>>>>
>>>> My understanding is that on recovery (whether from a checkpoint or
>>>> savepoint), Flink recreates the operators (I guess those are DoFns in Beam)
>>>> with whatever state they had when the pipeline crashed. For example the
>>>> Kafka operator might contain the latest *safe* offset to restart from. But
>>>> I'm not seeing this when I introduce exceptions in the pipeline.
>>>>
>>>> My pipeline is as follows:
>>>> 1. Read a Kafka topic from start
>>>> 2. Have a DoFn that stores all incoming messages in a BagState
>>>> 3. Above DoFn triggers a timer set in such a way that it triggers after
>>>> there are a few checkpoints created and kept because of
>>>> --externalizeCheckpointsEnabled = true. This timer handler then outputs the
>>>> elements to the next operator, in this case KafkaIo.Write.
>>>> 4. Before the timer in #3 is executed manually trigger an exception
>>>> (listen to another kafka topic, and throw any time a new message comes in)
>>>>
>>>> What I observe:
>>>> 1. In #4 above Flink tries to process the exception twice then stops
>>>> the pipeline (because numberOfExecutionRetries =2 )
>>>> 2. After the pipeline is stopped, I see the checkpoints are kept in the
>>>> configured directory
>>>> 3. If I restart the pipeline (with --savepointPath = <path to latest
>>>> checkpoint from first run>):
>>>> 3a. No messages are read from kafka, because the Kafka reader reached
>>>> the end of the topic during the first run
>>>> 3b. StartBundles are not executed for my DoFn. Indicating that the DoFn
>>>> isn't even started
>>>> 3c. The timer in #3 is never executed, hence there is data loss as the
>>>> elements I had in my DoFn state are never processed
>>>> 4. If I manually reset the offset to the start of the topic and restart
>>>> the pipeline (with --savepointPath = <path to latest checkpoint from first
>>>> run>):
>>>> 4a. StartBundle methods are called
>>>> 4b. In ProcessElement, the BagState is empty on the first received
>>>> message. If I'm restoring from a checkpoint/savepoint, I would expect this
>>>> state to be filled.
>>>>
>>>> Is this correct behaviour? Am I doing something wrong?
>>>>
>>>> Thanks,
>>>> Cristian
>>>>
>>>> Other quirks I found:
>>>> a. If KafkaIO.Read is configured to read from the latest offset, and
>>>> there is an exception thrown in the pipeline before the first checkpoint
>>>> happens (let's say on the first message that comes in), when Flink retries
>>>> KafkaIO reads from the latest offset again. That means that the message
>>>> that caused the exception is not reprocessed. On the other hand, if the
>>>> exception is thrown after the first checkpoint, that message will be tried
>>>> twice (because numberOfExecutionRetries =2 ), and then the pipeline will
>>>> exit. I think this is working as designed but it feels a little weird
>>>> that the behaviour is different depending if there's a checkpoint or not.
>>>>
>>>> b. When KafkaIO.Write is configured with .withEOS(number, "group"), and
>>>> there is an exception thrown in the pipeline, the Flink job doesn't exit. I
>>>> think there is a kafka producer in KafkaExactlyOnceSink that is not closed
>>>> correctly.
>>>>
>>>