You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Eleanore Jin <el...@gmail.com> on 2020/08/10 15:58:39 UTC

Cannot resume from Savepoint when operator changes

Hi experts,

I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore
when enabling checkpoint and beam kafkaIO EOS, different scenarios to
resume a job from a savepoint. I am running Kafka and a standalone flink
cluster locally on my laptop.

Below are the scenarios that I have tried out:

1. Add a new topic as source
Before savepoint: read from input1 and write to output
Take a savepoint
After savepoint: read from input1 and input2 and write to output
Behavior: It did not output messages from input2

2. Remove a topic source
Before savepoint: read from input1 and input2 and write to output
Take a savepoint
After savepoint: read from input1 and write to output
Behavior: work as expected, only output messages from input1

3. Add a topic as sink
Before savepoint: read from input1 and write to output1
Take a savepoint
After savepoint: read from input1 and write to output1 and output2
Behavior: pipeline failed with exception
[image: image.png]

4. Remove a topic sink
Before savepoint: read from input1 and write to output1 and output2
Take a savepoint
After savepoint: read from input1 and write to output1
Behavior: It requires to change the sinkGroupId, otherwise get exception
[image: image.png]

So it looks like resume from savepoint does not really work when there is a
change in the DAG for source or sink, I wonder if this is expected
behaviour? Is this something to do with how Beam KafkaIO EOS state works or
is it something that is related to flink?

Thanks a lot!
Eleanore

Re: Cannot resume from Savepoint when operator changes

Posted by Eleanore Jin <el...@gmail.com>.
Hi Yun,

thanks for the response. I actually made it working. The missing part is:
1. if I do not introduce reshuffle from beam (which is flink rebalance
partition), then it is not working, if not resume from savepoint, I see the
consumer for the new source topic gets created, but when resume from
savepoint, it is not.
2. if I introduce reshuffle, and assign UID for reshuffle operator, then it
is able to resume from savepoint even I change the source topic from topic1
to topic2.

Thanks a lot for your help!
Eleanore

On Tue, Aug 18, 2020 at 10:59 AM Yun Tang <my...@live.com> wrote:

> Hi Eleanore
>
> Assigning uid to all operators is the key point to ensure state could be
> restored as expected no matter what changes introduced to the new DAG.
> For the 2nd question, savepoint does not store previous job graph and it
> should not prevent you to create the Kafka consumer for the new source
> topic unless kafka consumer has some internal logic for this behavior.
> You could try to start the job without resuming from the savepoint to see
> whether the Kafka consumer is created.
>
> Best
> Yun Tang
>
> ------------------------------
> *From:* Eleanore Jin <el...@gmail.com>
> *Sent:* Friday, August 14, 2020 14:08
> *To:* Yun Tang <my...@live.com>
> *Cc:* Arvid Heise <ar...@ververica.com>; user <us...@flink.apache.org>;
> Becket Qin <be...@gmail.com>
> *Subject:* Re: Cannot resume from Savepoint when operator changes
>
> Hi Yun,
>
> Thanks a lot for the direction! I checked how Beam pipeline gets
> translated into the Flink job, below is the snapshot of the code, *please
> see the highlighted red comments*
>
> try {
>   int parallelism =
>       context.getExecutionEnvironment().getMaxParallelism() > 0
>           ? context.getExecutionEnvironment().getMaxParallelism()
>           : context.getExecutionEnvironment().getParallelism();
>   UnboundedSourceWrapper<T, ?> sourceWrapper =
>       new UnboundedSourceWrapper<>(
>           fullName, context.getPipelineOptions(), rawSource, parallelism);
>
>   nonDedupSource =  *// this nonDedupSource has the uid setup, in my case, it is "source1/KafkaIO.Read/Read(KafkaUnboundedSource)"*
>       context
>           .getExecutionEnvironment()
>           .addSource(sourceWrapper)
>           .name(fullName)
>           .uid(fullName)
>           .returns(withIdTypeInfo);
>
>   if (rawSource.requiresDeduping()) {
>     source =
>         nonDedupSource
>             .keyBy(new ValueWithRecordIdKeySelector<>())
>             .transform(
>                 "deduping",
>                 outputTypeInfo,
>                 new DedupingOperator<>(context.getPipelineOptions()))
>             .uid(format("%s/__deduplicated__", fullName));
>   } else {
>     source = *// it will come here, and as you can see there is no uid setup for source*
>         nonDedupSource
>             .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
>             .returns(outputTypeInfo);
>   }
> } catch (Exception e) {
>   throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
> }
>
> context.setOutputDataStream(output, source);
>
> And from the Web UI, I see the nonDedupSource (which has UID), and flatMap are chained together, but only the highlighted read part has UID assigned.
>
> [image: image (31).png]
>
>
> 1. So does that mean the chained operations are treated together as 1 operator, which is represented by "source" in the above code, which does not have an UID assigned,
>
> and that is why it is not working?
>
>
> 2. If the above statement is true, what I observe is that after changing the DAG to read from a different source topic, it is not getting processed by the new flink job. Actually
>
> I don't even see the Kafka consumer get created for the new source topic. Is this behaviour expected if the resume from savepoint fails?
>
>
> Thanks a lot for the help!
>
> Eleanore
>
>
> On Thu, Aug 13, 2020 at 8:20 PM Yun Tang <my...@live.com> wrote:
>
> Hi Eleanore
>
> The prerequisite of resuming from savepoint is that we need to ensure the
> previous operator ids not change in the new DAG and you could think of a
> savepoint as holding a map of Operator ID -> State for each stateful
> operator[1]. That's why we recommend to set uid for those operators [2]. As
> I am not familiar with Beam, and not sure whether Beam would assign the
> operator ids.
> If the operator ids are not assigned well at the beginning, attach a new
> stateful operator might change those previous operator ids.
> One way to check this is loading your savepoint before and after you
> change the DAG to see whether the operator id changed. And you could use
> Checkpoints#loadCheckpointMetadata to load savepoint meta data.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#savepoint-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
> [3]
> https://github.com/apache/flink/blob/0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L98
>
> Best,
> Yun Tang
>
>
>
> ------------------------------
> *From:* Arvid Heise <ar...@ververica.com>
> *Sent:* Friday, August 14, 2020 3:36
> *To:* Eleanore Jin <el...@gmail.com>
> *Cc:* Yun Tang <my...@live.com>; user <us...@flink.apache.org>; Becket
> Qin <be...@gmail.com>
> *Subject:* Re: Cannot resume from Savepoint when operator changes
>
> Hi Eleanore,
>
> according to the savepoint FAQ [1], removing an operator is still possible
> if use the setting --allowNonRestoredState (short: -n) with the run command:
>
> $ bin/flink run -s :savepointPath -n [:runArgs]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-add-or-delete-or-reorder-operators-that-have-no-state-in-my-job
>
> On Thu, Aug 13, 2020 at 6:29 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
> Hi Yun,
>
> Thanks a lot for the reply. Later on I was able to make adding a new kafka
> topic as source working, which requires to add a Reshuffle operation after
> the source. The reason I came up to find this is: I was trying the
> monitoring API: GET /jobs/<jobId> to acquire the information of vertices.
> What I found out is: without Reshuffle, Beam seems chaining up all the
> operators together, and when include another source, the DAG changed, so
> savepoint cannot be mapped back to the original source.
>
> I have attached the DAG for 1 source and 1 sink, without reshuffle and
> with reshuffle.
>
> However even by adding reshuffle, this scenario does not work:
>
> original DAG: read from topic1 and publish to topic2
> Take a savepoint, cancel the job
> changed DAT: read from topic3 instead of topic1 and publish to topic2
> Resume from savepoint.
>
> The behavior after resume is: there is no message output to topic2, and
> from the log, I did not see Kakfa consumer for topic3 being created.
>
> So I have an assumption: *if just adding a new stateful operator, or just
> removing a stateful operator, it works fine when resume from savepoint. But
> if add a new stateful operator and remove an existing stateful operator,
> then cannot resume from savepoint.* Can you please help me clarify my
> doubt?
>
> Thanks a lot!
> Eleanore
>
> [image: no-reshuffle.png]
> [image: reshuffle.png]
>
> On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com> wrote:
>
> Hi Eleanore
>
> When adding an operator of source while savepoint not included, it would
> run from scratch and fetch the offset depended on your configuration of
> source connector.
>
> Take the scenario of 'Add a new topic as source' for example, job would
> consume the new input2 source with offset based on the configuration of
> your kafka connector.
> On the other hand, take the scenario of 'Remove a topic source' for
> example, job needs to enable non-restored-state to resume from savepoint
> and drop the useless input2.
>
> This is the general procedure for resuming savepint, and different
> operator/connector/sink could have its rule to consume or write to external
> systems. Already cc Becket who is expert at Kafka and could offer more
> information about kafka source and sink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* Eleanore Jin <el...@gmail.com>
> *Sent:* Monday, August 10, 2020 23:58
> *To:* user <us...@flink.apache.org>
> *Subject:* Cannot resume from Savepoint when operator changes
>
> Hi experts,
>
> I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore
> when enabling checkpoint and beam kafkaIO EOS, different scenarios to
> resume a job from a savepoint. I am running Kafka and a standalone flink
> cluster locally on my laptop.
>
> Below are the scenarios that I have tried out:
>
> 1. Add a new topic as source
> Before savepoint: read from input1 and write to output
> Take a savepoint
> After savepoint: read from input1 and input2 and write to output
> Behavior: It did not output messages from input2
>
> 2. Remove a topic source
> Before savepoint: read from input1 and input2 and write to output
> Take a savepoint
> After savepoint: read from input1 and write to output
> Behavior: work as expected, only output messages from input1
>
> 3. Add a topic as sink
> Before savepoint: read from input1 and write to output1
> Take a savepoint
> After savepoint: read from input1 and write to output1 and output2
> Behavior: pipeline failed with exception
> [image: image.png]
>
> 4. Remove a topic sink
> Before savepoint: read from input1 and write to output1 and output2
> Take a savepoint
> After savepoint: read from input1 and write to output1
> Behavior: It requires to change the sinkGroupId, otherwise get exception
> [image: image.png]
>
> So it looks like resume from savepoint does not really work when there is
> a change in the DAG for source or sink, I wonder if this is expected
> behaviour? Is this something to do with how Beam KafkaIO EOS state works or
> is it something that is related to flink?
>
> Thanks a lot!
> Eleanore
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>
>

Re: Cannot resume from Savepoint when operator changes

Posted by Yun Tang <my...@live.com>.
Hi Eleanore

Assigning uid to all operators is the key point to ensure state could be restored as expected no matter what changes introduced to the new DAG.
For the 2nd question, savepoint does not store previous job graph and it should not prevent you to create the Kafka consumer for the new source topic unless kafka consumer has some internal logic for this behavior.
You could try to start the job without resuming from the savepoint to see whether the Kafka consumer is created.

Best
Yun Tang

________________________________
From: Eleanore Jin <el...@gmail.com>
Sent: Friday, August 14, 2020 14:08
To: Yun Tang <my...@live.com>
Cc: Arvid Heise <ar...@ververica.com>; user <us...@flink.apache.org>; Becket Qin <be...@gmail.com>
Subject: Re: Cannot resume from Savepoint when operator changes

Hi Yun,

Thanks a lot for the direction! I checked how Beam pipeline gets translated into the Flink job, below is the snapshot of the code, please see the highlighted red comments


try {
  int parallelism =
      context.getExecutionEnvironment().getMaxParallelism() > 0
          ? context.getExecutionEnvironment().getMaxParallelism()
          : context.getExecutionEnvironment().getParallelism();
  UnboundedSourceWrapper<T, ?> sourceWrapper =
      new UnboundedSourceWrapper<>(
          fullName, context.getPipelineOptions(), rawSource, parallelism);

  nonDedupSource =  // this nonDedupSource has the uid setup, in my case, it is "source1/KafkaIO.Read/Read(KafkaUnboundedSource)"
      context
          .getExecutionEnvironment()
          .addSource(sourceWrapper)
          .name(fullName)
          .uid(fullName)
          .returns(withIdTypeInfo);

  if (rawSource.requiresDeduping()) {
    source =
        nonDedupSource
            .keyBy(new ValueWithRecordIdKeySelector<>())
            .transform(
                "deduping",
                outputTypeInfo,
                new DedupingOperator<>(context.getPipelineOptions()))
            .uid(format("%s/__deduplicated__", fullName));
  } else {
    source = // it will come here, and as you can see there is no uid setup for source
        nonDedupSource
            .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
            .returns(outputTypeInfo);
  }
} catch (Exception e) {
  throw new RuntimeException("Error while translating UnboundedSource: " + rawSource, e);
}

context.setOutputDataStream(output, source);

And from the Web UI, I see the nonDedupSource (which has UID), and flatMap are chained together, but only the highlighted read part has UID assigned.

[image (31).png]


1. So does that mean the chained operations are treated together as 1 operator, which is represented by "source" in the above code, which does not have an UID assigned,

and that is why it is not working?


2. If the above statement is true, what I observe is that after changing the DAG to read from a different source topic, it is not getting processed by the new flink job. Actually

I don't even see the Kafka consumer get created for the new source topic. Is this behaviour expected if the resume from savepoint fails?


Thanks a lot for the help!

Eleanore

On Thu, Aug 13, 2020 at 8:20 PM Yun Tang <my...@live.com>> wrote:
Hi Eleanore

The prerequisite of resuming from savepoint is that we need to ensure the previous operator ids not change in the new DAG and you could think of a savepoint as holding a map of Operator ID -> State for each stateful operator[1]. That's why we recommend to set uid for those operators [2]. As I am not familiar with Beam, and not sure whether Beam would assign the operator ids.
If the operator ids are not assigned well at the beginning, attach a new stateful operator might change those previous operator ids.
One way to check this is loading your savepoint before and after you change the DAG to see whether the operator id changed. And you could use Checkpoints#loadCheckpointMetadata to load savepoint meta data.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#savepoint-state
[2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
[3] https://github.com/apache/flink/blob/0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L98

Best,
Yun Tang



________________________________
From: Arvid Heise <ar...@ververica.com>>
Sent: Friday, August 14, 2020 3:36
To: Eleanore Jin <el...@gmail.com>>
Cc: Yun Tang <my...@live.com>>; user <us...@flink.apache.org>>; Becket Qin <be...@gmail.com>>
Subject: Re: Cannot resume from Savepoint when operator changes

Hi Eleanore,

according to the savepoint FAQ [1], removing an operator is still possible if use the setting --allowNonRestoredState (short: -n) with the run command:

$ bin/flink run -s :savepointPath -n [:runArgs]

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-add-or-delete-or-reorder-operators-that-have-no-state-in-my-job

On Thu, Aug 13, 2020 at 6:29 PM Eleanore Jin <el...@gmail.com>> wrote:
Hi Yun,

Thanks a lot for the reply. Later on I was able to make adding a new kafka topic as source working, which requires to add a Reshuffle operation after the source. The reason I came up to find this is: I was trying the monitoring API: GET /jobs/<jobId> to acquire the information of vertices. What I found out is: without Reshuffle, Beam seems chaining up all the operators together, and when include another source, the DAG changed, so savepoint cannot be mapped back to the original source.

I have attached the DAG for 1 source and 1 sink, without reshuffle and with reshuffle.

However even by adding reshuffle, this scenario does not work:

original DAG: read from topic1 and publish to topic2
Take a savepoint, cancel the job
changed DAT: read from topic3 instead of topic1 and publish to topic2
Resume from savepoint.

The behavior after resume is: there is no message output to topic2, and from the log, I did not see Kakfa consumer for topic3 being created.

So I have an assumption: if just adding a new stateful operator, or just removing a stateful operator, it works fine when resume from savepoint. But if add a new stateful operator and remove an existing stateful operator, then cannot resume from savepoint. Can you please help me clarify my doubt?

Thanks a lot!
Eleanore

[no-reshuffle.png]
[reshuffle.png]

On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com>> wrote:
Hi Eleanore

When adding an operator of source while savepoint not included, it would run from scratch and fetch the offset depended on your configuration of source connector.

Take the scenario of 'Add a new topic as source' for example, job would consume the new input2 source with offset based on the configuration of your kafka connector.
On the other hand, take the scenario of 'Remove a topic source' for example, job needs to enable non-restored-state to resume from savepoint and drop the useless input2.

This is the general procedure for resuming savepint, and different operator/connector/sink could have its rule to consume or write to external systems. Already cc Becket who is expert at Kafka and could offer more information about kafka source and sink.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


________________________________
From: Eleanore Jin <el...@gmail.com>>
Sent: Monday, August 10, 2020 23:58
To: user <us...@flink.apache.org>>
Subject: Cannot resume from Savepoint when operator changes

Hi experts,

I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore when enabling checkpoint and beam kafkaIO EOS, different scenarios to resume a job from a savepoint. I am running Kafka and a standalone flink cluster locally on my laptop.

Below are the scenarios that I have tried out:

1. Add a new topic as source
Before savepoint: read from input1 and write to output
Take a savepoint
After savepoint: read from input1 and input2 and write to output
Behavior: It did not output messages from input2

2. Remove a topic source
Before savepoint: read from input1 and input2 and write to output
Take a savepoint
After savepoint: read from input1 and write to output
Behavior: work as expected, only output messages from input1

3. Add a topic as sink
Before savepoint: read from input1 and write to output1
Take a savepoint
After savepoint: read from input1 and write to output1 and output2
Behavior: pipeline failed with exception
[image.png]

4. Remove a topic sink
Before savepoint: read from input1 and write to output1 and output2
Take a savepoint
After savepoint: read from input1 and write to output1
Behavior: It requires to change the sinkGroupId, otherwise get exception
[image.png]

So it looks like resume from savepoint does not really work when there is a change in the DAG for source or sink, I wonder if this is expected behaviour? Is this something to do with how Beam KafkaIO EOS state works or is it something that is related to flink?

Thanks a lot!
Eleanore



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Cannot resume from Savepoint when operator changes

Posted by Eleanore Jin <el...@gmail.com>.
Hi Yun,

Thanks a lot for the direction! I checked how Beam pipeline gets translated
into the Flink job, below is the snapshot of the code, *please see the
highlighted red comments*

try {
  int parallelism =
      context.getExecutionEnvironment().getMaxParallelism() > 0
          ? context.getExecutionEnvironment().getMaxParallelism()
          : context.getExecutionEnvironment().getParallelism();
  UnboundedSourceWrapper<T, ?> sourceWrapper =
      new UnboundedSourceWrapper<>(
          fullName, context.getPipelineOptions(), rawSource, parallelism);

  nonDedupSource =  *// this nonDedupSource has the uid setup, in my
case, it is "source1/KafkaIO.Read/Read(KafkaUnboundedSource)"*
      context
          .getExecutionEnvironment()
          .addSource(sourceWrapper)
          .name(fullName)
          .uid(fullName)
          .returns(withIdTypeInfo);

  if (rawSource.requiresDeduping()) {
    source =
        nonDedupSource
            .keyBy(new ValueWithRecordIdKeySelector<>())
            .transform(
                "deduping",
                outputTypeInfo,
                new DedupingOperator<>(context.getPipelineOptions()))
            .uid(format("%s/__deduplicated__", fullName));
  } else {
    source = *// it will come here, and as you can see there is no uid
setup for source*
        nonDedupSource
            .flatMap(new StripIdsMap<>(context.getPipelineOptions()))
            .returns(outputTypeInfo);
  }
} catch (Exception e) {
  throw new RuntimeException("Error while translating UnboundedSource:
" + rawSource, e);
}

context.setOutputDataStream(output, source);

And from the Web UI, I see the nonDedupSource (which has UID), and
flatMap are chained together, but only the highlighted read part has
UID assigned.

[image: image (31).png]


1. So does that mean the chained operations are treated together as 1
operator, which is represented by "source" in the above code, which
does not have an UID assigned,

and that is why it is not working?


2. If the above statement is true, what I observe is that after
changing the DAG to read from a different source topic, it is not
getting processed by the new flink job. Actually

I don't even see the Kafka consumer get created for the new source
topic. Is this behaviour expected if the resume from savepoint fails?


Thanks a lot for the help!

Eleanore


On Thu, Aug 13, 2020 at 8:20 PM Yun Tang <my...@live.com> wrote:

> Hi Eleanore
>
> The prerequisite of resuming from savepoint is that we need to ensure the
> previous operator ids not change in the new DAG and you could think of a
> savepoint as holding a map of Operator ID -> State for each stateful
> operator[1]. That's why we recommend to set uid for those operators [2]. As
> I am not familiar with Beam, and not sure whether Beam would assign the
> operator ids.
> If the operator ids are not assigned well at the beginning, attach a new
> stateful operator might change those previous operator ids.
> One way to check this is loading your savepoint before and after you
> change the DAG to see whether the operator id changed. And you could use
> Checkpoints#loadCheckpointMetadata to load savepoint meta data.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#savepoint-state
> [2]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
> [3]
> https://github.com/apache/flink/blob/0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L98
>
> Best,
> Yun Tang
>
>
>
> ------------------------------
> *From:* Arvid Heise <ar...@ververica.com>
> *Sent:* Friday, August 14, 2020 3:36
> *To:* Eleanore Jin <el...@gmail.com>
> *Cc:* Yun Tang <my...@live.com>; user <us...@flink.apache.org>; Becket
> Qin <be...@gmail.com>
> *Subject:* Re: Cannot resume from Savepoint when operator changes
>
> Hi Eleanore,
>
> according to the savepoint FAQ [1], removing an operator is still possible
> if use the setting --allowNonRestoredState (short: -n) with the run command:
>
> $ bin/flink run -s :savepointPath -n [:runArgs]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-add-or-delete-or-reorder-operators-that-have-no-state-in-my-job
>
> On Thu, Aug 13, 2020 at 6:29 PM Eleanore Jin <el...@gmail.com>
> wrote:
>
> Hi Yun,
>
> Thanks a lot for the reply. Later on I was able to make adding a new kafka
> topic as source working, which requires to add a Reshuffle operation after
> the source. The reason I came up to find this is: I was trying the
> monitoring API: GET /jobs/<jobId> to acquire the information of vertices.
> What I found out is: without Reshuffle, Beam seems chaining up all the
> operators together, and when include another source, the DAG changed, so
> savepoint cannot be mapped back to the original source.
>
> I have attached the DAG for 1 source and 1 sink, without reshuffle and
> with reshuffle.
>
> However even by adding reshuffle, this scenario does not work:
>
> original DAG: read from topic1 and publish to topic2
> Take a savepoint, cancel the job
> changed DAT: read from topic3 instead of topic1 and publish to topic2
> Resume from savepoint.
>
> The behavior after resume is: there is no message output to topic2, and
> from the log, I did not see Kakfa consumer for topic3 being created.
>
> So I have an assumption: *if just adding a new stateful operator, or just
> removing a stateful operator, it works fine when resume from savepoint. But
> if add a new stateful operator and remove an existing stateful operator,
> then cannot resume from savepoint.* Can you please help me clarify my
> doubt?
>
> Thanks a lot!
> Eleanore
>
> [image: no-reshuffle.png]
> [image: reshuffle.png]
>
> On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com> wrote:
>
> Hi Eleanore
>
> When adding an operator of source while savepoint not included, it would
> run from scratch and fetch the offset depended on your configuration of
> source connector.
>
> Take the scenario of 'Add a new topic as source' for example, job would
> consume the new input2 source with offset based on the configuration of
> your kafka connector.
> On the other hand, take the scenario of 'Remove a topic source' for
> example, job needs to enable non-restored-state to resume from savepoint
> and drop the useless input2.
>
> This is the general procedure for resuming savepint, and different
> operator/connector/sink could have its rule to consume or write to external
> systems. Already cc Becket who is expert at Kafka and could offer more
> information about kafka source and sink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* Eleanore Jin <el...@gmail.com>
> *Sent:* Monday, August 10, 2020 23:58
> *To:* user <us...@flink.apache.org>
> *Subject:* Cannot resume from Savepoint when operator changes
>
> Hi experts,
>
> I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore
> when enabling checkpoint and beam kafkaIO EOS, different scenarios to
> resume a job from a savepoint. I am running Kafka and a standalone flink
> cluster locally on my laptop.
>
> Below are the scenarios that I have tried out:
>
> 1. Add a new topic as source
> Before savepoint: read from input1 and write to output
> Take a savepoint
> After savepoint: read from input1 and input2 and write to output
> Behavior: It did not output messages from input2
>
> 2. Remove a topic source
> Before savepoint: read from input1 and input2 and write to output
> Take a savepoint
> After savepoint: read from input1 and write to output
> Behavior: work as expected, only output messages from input1
>
> 3. Add a topic as sink
> Before savepoint: read from input1 and write to output1
> Take a savepoint
> After savepoint: read from input1 and write to output1 and output2
> Behavior: pipeline failed with exception
> [image: image.png]
>
> 4. Remove a topic sink
> Before savepoint: read from input1 and write to output1 and output2
> Take a savepoint
> After savepoint: read from input1 and write to output1
> Behavior: It requires to change the sinkGroupId, otherwise get exception
> [image: image.png]
>
> So it looks like resume from savepoint does not really work when there is
> a change in the DAG for source or sink, I wonder if this is expected
> behaviour? Is this something to do with how Beam KafkaIO EOS state works or
> is it something that is related to flink?
>
> Thanks a lot!
> Eleanore
>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>

Re: Cannot resume from Savepoint when operator changes

Posted by Yun Tang <my...@live.com>.
Hi Eleanore

The prerequisite of resuming from savepoint is that we need to ensure the previous operator ids not change in the new DAG and you could think of a savepoint as holding a map of Operator ID -> State for each stateful operator[1]. That's why we recommend to set uid for those operators [2]. As I am not familiar with Beam, and not sure whether Beam would assign the operator ids.
If the operator ids are not assigned well at the beginning, attach a new stateful operator might change those previous operator ids.
One way to check this is loading your savepoint before and after you change the DAG to see whether the operator id changed. And you could use Checkpoints#loadCheckpointMetadata to load savepoint meta data.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#savepoint-state
[2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#should-i-assign-ids-to-all-operators-in-my-job
[3] https://github.com/apache/flink/blob/0e8e8062bcc159e9ed2a0d4a0a61db4efcb01f2f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L98

Best,
Yun Tang



________________________________
From: Arvid Heise <ar...@ververica.com>
Sent: Friday, August 14, 2020 3:36
To: Eleanore Jin <el...@gmail.com>
Cc: Yun Tang <my...@live.com>; user <us...@flink.apache.org>; Becket Qin <be...@gmail.com>
Subject: Re: Cannot resume from Savepoint when operator changes

Hi Eleanore,

according to the savepoint FAQ [1], removing an operator is still possible if use the setting --allowNonRestoredState (short: -n) with the run command:

$ bin/flink run -s :savepointPath -n [:runArgs]

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-add-or-delete-or-reorder-operators-that-have-no-state-in-my-job

On Thu, Aug 13, 2020 at 6:29 PM Eleanore Jin <el...@gmail.com>> wrote:
Hi Yun,

Thanks a lot for the reply. Later on I was able to make adding a new kafka topic as source working, which requires to add a Reshuffle operation after the source. The reason I came up to find this is: I was trying the monitoring API: GET /jobs/<jobId> to acquire the information of vertices. What I found out is: without Reshuffle, Beam seems chaining up all the operators together, and when include another source, the DAG changed, so savepoint cannot be mapped back to the original source.

I have attached the DAG for 1 source and 1 sink, without reshuffle and with reshuffle.

However even by adding reshuffle, this scenario does not work:

original DAG: read from topic1 and publish to topic2
Take a savepoint, cancel the job
changed DAT: read from topic3 instead of topic1 and publish to topic2
Resume from savepoint.

The behavior after resume is: there is no message output to topic2, and from the log, I did not see Kakfa consumer for topic3 being created.

So I have an assumption: if just adding a new stateful operator, or just removing a stateful operator, it works fine when resume from savepoint. But if add a new stateful operator and remove an existing stateful operator, then cannot resume from savepoint. Can you please help me clarify my doubt?

Thanks a lot!
Eleanore

[no-reshuffle.png]
[reshuffle.png]

On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com>> wrote:
Hi Eleanore

When adding an operator of source while savepoint not included, it would run from scratch and fetch the offset depended on your configuration of source connector.

Take the scenario of 'Add a new topic as source' for example, job would consume the new input2 source with offset based on the configuration of your kafka connector.
On the other hand, take the scenario of 'Remove a topic source' for example, job needs to enable non-restored-state to resume from savepoint and drop the useless input2.

This is the general procedure for resuming savepint, and different operator/connector/sink could have its rule to consume or write to external systems. Already cc Becket who is expert at Kafka and could offer more information about kafka source and sink.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


________________________________
From: Eleanore Jin <el...@gmail.com>>
Sent: Monday, August 10, 2020 23:58
To: user <us...@flink.apache.org>>
Subject: Cannot resume from Savepoint when operator changes

Hi experts,

I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore when enabling checkpoint and beam kafkaIO EOS, different scenarios to resume a job from a savepoint. I am running Kafka and a standalone flink cluster locally on my laptop.

Below are the scenarios that I have tried out:

1. Add a new topic as source
Before savepoint: read from input1 and write to output
Take a savepoint
After savepoint: read from input1 and input2 and write to output
Behavior: It did not output messages from input2

2. Remove a topic source
Before savepoint: read from input1 and input2 and write to output
Take a savepoint
After savepoint: read from input1 and write to output
Behavior: work as expected, only output messages from input1

3. Add a topic as sink
Before savepoint: read from input1 and write to output1
Take a savepoint
After savepoint: read from input1 and write to output1 and output2
Behavior: pipeline failed with exception
[image.png]

4. Remove a topic sink
Before savepoint: read from input1 and write to output1 and output2
Take a savepoint
After savepoint: read from input1 and write to output1
Behavior: It requires to change the sinkGroupId, otherwise get exception
[image.png]

So it looks like resume from savepoint does not really work when there is a change in the DAG for source or sink, I wonder if this is expected behaviour? Is this something to do with how Beam KafkaIO EOS state works or is it something that is related to flink?

Thanks a lot!
Eleanore



--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) Cheng

Re: Cannot resume from Savepoint when operator changes

Posted by Arvid Heise <ar...@ververica.com>.
Hi Eleanore,

according to the savepoint FAQ [1], removing an operator is still possible
if use the setting --allowNonRestoredState (short: -n) with the run command:

$ bin/flink run -s :savepointPath -n [:runArgs]

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#what-happens-if-i-add-or-delete-or-reorder-operators-that-have-no-state-in-my-job

On Thu, Aug 13, 2020 at 6:29 PM Eleanore Jin <el...@gmail.com> wrote:

> Hi Yun,
>
> Thanks a lot for the reply. Later on I was able to make adding a new kafka
> topic as source working, which requires to add a Reshuffle operation after
> the source. The reason I came up to find this is: I was trying the
> monitoring API: GET /jobs/<jobId> to acquire the information of vertices.
> What I found out is: without Reshuffle, Beam seems chaining up all the
> operators together, and when include another source, the DAG changed, so
> savepoint cannot be mapped back to the original source.
>
> I have attached the DAG for 1 source and 1 sink, without reshuffle and
> with reshuffle.
>
> However even by adding reshuffle, this scenario does not work:
>
> original DAG: read from topic1 and publish to topic2
> Take a savepoint, cancel the job
> changed DAT: read from topic3 instead of topic1 and publish to topic2
> Resume from savepoint.
>
> The behavior after resume is: there is no message output to topic2, and
> from the log, I did not see Kakfa consumer for topic3 being created.
>
> So I have an assumption: *if just adding a new stateful operator, or just
> removing a stateful operator, it works fine when resume from savepoint. But
> if add a new stateful operator and remove an existing stateful operator,
> then cannot resume from savepoint.* Can you please help me clarify my
> doubt?
>
> Thanks a lot!
> Eleanore
>
> [image: no-reshuffle.png]
> [image: reshuffle.png]
>
> On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com> wrote:
>
>> Hi Eleanore
>>
>> When adding an operator of source while savepoint not included, it would
>> run from scratch and fetch the offset depended on your configuration of
>> source connector.
>>
>> Take the scenario of 'Add a new topic as source' for example, job would
>> consume the new input2 source with offset based on the configuration of
>> your kafka connector.
>> On the other hand, take the scenario of 'Remove a topic source' for
>> example, job needs to enable non-restored-state to resume from savepoint
>> and drop the useless input2.
>>
>> This is the general procedure for resuming savepint, and different
>> operator/connector/sink could have its rule to consume or write to external
>> systems. Already cc Becket who is expert at Kafka and could offer more
>> information about kafka source and sink.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>>
>> Best
>> Yun Tang
>>
>>
>> ------------------------------
>> *From:* Eleanore Jin <el...@gmail.com>
>> *Sent:* Monday, August 10, 2020 23:58
>> *To:* user <us...@flink.apache.org>
>> *Subject:* Cannot resume from Savepoint when operator changes
>>
>> Hi experts,
>>
>> I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore
>> when enabling checkpoint and beam kafkaIO EOS, different scenarios to
>> resume a job from a savepoint. I am running Kafka and a standalone flink
>> cluster locally on my laptop.
>>
>> Below are the scenarios that I have tried out:
>>
>> 1. Add a new topic as source
>> Before savepoint: read from input1 and write to output
>> Take a savepoint
>> After savepoint: read from input1 and input2 and write to output
>> Behavior: It did not output messages from input2
>>
>> 2. Remove a topic source
>> Before savepoint: read from input1 and input2 and write to output
>> Take a savepoint
>> After savepoint: read from input1 and write to output
>> Behavior: work as expected, only output messages from input1
>>
>> 3. Add a topic as sink
>> Before savepoint: read from input1 and write to output1
>> Take a savepoint
>> After savepoint: read from input1 and write to output1 and output2
>> Behavior: pipeline failed with exception
>> [image: image.png]
>>
>> 4. Remove a topic sink
>> Before savepoint: read from input1 and write to output1 and output2
>> Take a savepoint
>> After savepoint: read from input1 and write to output1
>> Behavior: It requires to change the sinkGroupId, otherwise get exception
>> [image: image.png]
>>
>> So it looks like resume from savepoint does not really work when there is
>> a change in the DAG for source or sink, I wonder if this is expected
>> behaviour? Is this something to do with how Beam KafkaIO EOS state works or
>> is it something that is related to flink?
>>
>> Thanks a lot!
>> Eleanore
>>
>>

-- 

Arvid Heise | Senior Java Developer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng

Re: Cannot resume from Savepoint when operator changes

Posted by Eleanore Jin <el...@gmail.com>.
Hi Yun,

Thanks a lot for the reply. Later on I was able to make adding a new kafka
topic as source working, which requires to add a Reshuffle operation after
the source. The reason I came up to find this is: I was trying the
monitoring API: GET /jobs/<jobId> to acquire the information of vertices.
What I found out is: without Reshuffle, Beam seems chaining up all the
operators together, and when include another source, the DAG changed, so
savepoint cannot be mapped back to the original source.

I have attached the DAG for 1 source and 1 sink, without reshuffle and with
reshuffle.

However even by adding reshuffle, this scenario does not work:

original DAG: read from topic1 and publish to topic2
Take a savepoint, cancel the job
changed DAT: read from topic3 instead of topic1 and publish to topic2
Resume from savepoint.

The behavior after resume is: there is no message output to topic2, and
from the log, I did not see Kakfa consumer for topic3 being created.

So I have an assumption: *if just adding a new stateful operator, or just
removing a stateful operator, it works fine when resume from savepoint. But
if add a new stateful operator and remove an existing stateful operator,
then cannot resume from savepoint.* Can you please help me clarify my
doubt?

Thanks a lot!
Eleanore

[image: no-reshuffle.png]
[image: reshuffle.png]

On Thu, Aug 13, 2020 at 3:34 AM Yun Tang <my...@live.com> wrote:

> Hi Eleanore
>
> When adding an operator of source while savepoint not included, it would
> run from scratch and fetch the offset depended on your configuration of
> source connector.
>
> Take the scenario of 'Add a new topic as source' for example, job would
> consume the new input2 source with offset based on the configuration of
> your kafka connector.
> On the other hand, take the scenario of 'Remove a topic source' for
> example, job needs to enable non-restored-state to resume from savepoint
> and drop the useless input2.
>
> This is the general procedure for resuming savepint, and different
> operator/connector/sink could have its rule to consume or write to external
> systems. Already cc Becket who is expert at Kafka and could offer more
> information about kafka source and sink.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state
>
> Best
> Yun Tang
>
>
> ------------------------------
> *From:* Eleanore Jin <el...@gmail.com>
> *Sent:* Monday, August 10, 2020 23:58
> *To:* user <us...@flink.apache.org>
> *Subject:* Cannot resume from Savepoint when operator changes
>
> Hi experts,
>
> I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore
> when enabling checkpoint and beam kafkaIO EOS, different scenarios to
> resume a job from a savepoint. I am running Kafka and a standalone flink
> cluster locally on my laptop.
>
> Below are the scenarios that I have tried out:
>
> 1. Add a new topic as source
> Before savepoint: read from input1 and write to output
> Take a savepoint
> After savepoint: read from input1 and input2 and write to output
> Behavior: It did not output messages from input2
>
> 2. Remove a topic source
> Before savepoint: read from input1 and input2 and write to output
> Take a savepoint
> After savepoint: read from input1 and write to output
> Behavior: work as expected, only output messages from input1
>
> 3. Add a topic as sink
> Before savepoint: read from input1 and write to output1
> Take a savepoint
> After savepoint: read from input1 and write to output1 and output2
> Behavior: pipeline failed with exception
> [image: image.png]
>
> 4. Remove a topic sink
> Before savepoint: read from input1 and write to output1 and output2
> Take a savepoint
> After savepoint: read from input1 and write to output1
> Behavior: It requires to change the sinkGroupId, otherwise get exception
> [image: image.png]
>
> So it looks like resume from savepoint does not really work when there is
> a change in the DAG for source or sink, I wonder if this is expected
> behaviour? Is this something to do with how Beam KafkaIO EOS state works or
> is it something that is related to flink?
>
> Thanks a lot!
> Eleanore
>
>

Re: Cannot resume from Savepoint when operator changes

Posted by Yun Tang <my...@live.com>.
Hi Eleanore

When adding an operator of source while savepoint not included, it would run from scratch and fetch the offset depended on your configuration of source connector.

Take the scenario of 'Add a new topic as source' for example, job would consume the new input2 source with offset based on the configuration of your kafka connector.
On the other hand, take the scenario of 'Remove a topic source' for example, job needs to enable non-restored-state to resume from savepoint and drop the useless input2.

This is the general procedure for resuming savepint, and different operator/connector/sink could have its rule to consume or write to external systems. Already cc Becket who is expert at Kafka and could offer more information about kafka source and sink.

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#allowing-non-restored-state

Best
Yun Tang


________________________________
From: Eleanore Jin <el...@gmail.com>
Sent: Monday, August 10, 2020 23:58
To: user <us...@flink.apache.org>
Subject: Cannot resume from Savepoint when operator changes

Hi experts,

I am using Beam 2.23.0, with flink runner 1.8.2. I am trying to explore when enabling checkpoint and beam kafkaIO EOS, different scenarios to resume a job from a savepoint. I am running Kafka and a standalone flink cluster locally on my laptop.

Below are the scenarios that I have tried out:

1. Add a new topic as source
Before savepoint: read from input1 and write to output
Take a savepoint
After savepoint: read from input1 and input2 and write to output
Behavior: It did not output messages from input2

2. Remove a topic source
Before savepoint: read from input1 and input2 and write to output
Take a savepoint
After savepoint: read from input1 and write to output
Behavior: work as expected, only output messages from input1

3. Add a topic as sink
Before savepoint: read from input1 and write to output1
Take a savepoint
After savepoint: read from input1 and write to output1 and output2
Behavior: pipeline failed with exception
[image.png]

4. Remove a topic sink
Before savepoint: read from input1 and write to output1 and output2
Take a savepoint
After savepoint: read from input1 and write to output1
Behavior: It requires to change the sinkGroupId, otherwise get exception
[image.png]

So it looks like resume from savepoint does not really work when there is a change in the DAG for source or sink, I wonder if this is expected behaviour? Is this something to do with how Beam KafkaIO EOS state works or is it something that is related to flink?

Thanks a lot!
Eleanore