You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Jin Yi <el...@gmail.com> on 2020/03/04 00:39:07 UTC

FlinkStateBackendFactory

Hi Experts,

I am running Beam application with Flink Runner. I would like to set State
Backend to be FsStateBackend instead of MemoryStateBackend.

in FlinkPipelineOptions.java

I should be able to call setStateBackendFactory(), but I did not find any
provided implementations for FlinkStateBackendFactory interface, so that
means I have to implement my own?

Thanks a lot!
Eleanore

/**
 * State backend to store Beam's state during computation. Note: Only
applicable when executing in
 * streaming mode.
 */
@Description(
    "Sets the state backend factory to use in streaming mode. "
        + "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();

void setStateBackendFactory(Class<? extends FlinkStateBackendFactory>
stateBackendFactory);

Re: FlinkStateBackendFactory

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks a lot for the clarification!

Best
Eleanore

On Wed, Mar 11, 2020 at 11:32 AM Maximilian Michels <mx...@apache.org> wrote:

> Please see my answers inline.
>
> -Max
>
> On 10.03.20 05:02, Eleanore Jin wrote:
> > Hi Max,
> >
> > Thanks for the response! the reason to setup the state backend is to
> > experiment Kafka EOS with Beam running on Flink.  Reading through the
> > code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> > you please help me clarify my understanding?
> >
> > 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> > EOS, ExactlyOnceWriter processElement method is annotated
> > with @RequiresStableInput, so all the messages will be cached
> > by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> > messages will be processed by ExactlyOnceWriter?
>
> That's correct.
>
> >
> > 2. Upon checkpoint, will those messages cached by
> > KeyedBufferingEleementsHandler also checkpointed?
>
> Yes, the buffered elements will be checkpointed.
>
> > 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> > stream processing, the delay is based on the checkpoint interval? How to
> > reduce the latency while still have EOS guarantee?
>
> Indeed, the checkpoint interval and the checkpoint duration limits the
> latency. Given the current design and the guarantees, there is no other
> way to influence the latency.
>
> > 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> > checkpoint successfully, the checkpointed offset will be committed back
> > to kafka, but if this operation does not finish successfully, and then
> > the job gets cancelled/stopped, and re-submit the job again (with the
> > same consumer group for source topics, but different jobID), then it is
> > possible duplicated processing still exists? because the consumed offset
> > is not committed back to kafka?
>
> This option is for the Kafka consumer. AFAIK this is just a convenience
> method to commit the latest checkpointed offset to Kafka. This offset is
> not used when restoring from a checkpoint. However, if you don't restore
> from a checkpoint, you can resume from that offset which might be
> convenient or not, depending on your use case.
>
> >
> > Thanks a lot!
> > Eleanore
> >
> > On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Eleanore,
> >
> >     Good question. I think the easiest way is to configure this in the
> >     Flink
> >     configuration file, i.e. flink-conf.yaml. Then you don't need to set
> >     anything in Beam.
> >
> >     If you want to go with your approach, then just use
> >     getClass().getClassLoader() unless you have some custom classloader
> for
> >     loading your state backend.
> >
> >     Cheers,
> >     Max
> >
> >     On 04.03.20 01:39, Jin Yi wrote:
> >     > Hi Experts,
> >     >
> >     > I am running Beam application with Flink Runner. I would like to
> set
> >     > State Backend to be FsStateBackend instead of MemoryStateBackend.
> >     >
> >     > in FlinkPipelineOptions.java
> >     >
> >     > I should be able to call setStateBackendFactory(), but I did not
> find
> >     > any provided implementations for FlinkStateBackendFactory
> >     interface, so
> >     > that means I have to implement my own?
> >     >
> >     > Thanks a lot!
> >     > Eleanore
> >     >
> >     > /**
> >     > * State backend to store Beam's state during computation. Note:
> Only
> >     > applicable when executing in
> >     > * streaming mode.
> >     > */
> >     > @Description(
> >     >      "Sets the state backend factory to use in streaming mode. "
> >     > +"Defaults to the flink cluster's state.backend configuration.")
> >     > Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
> >     >
> >     > void setStateBackendFactory(Class<?extends
> >     FlinkStateBackendFactory> stateBackendFactory);
> >     >
> >
>

Re: FlinkStateBackendFactory

Posted by Maximilian Michels <mx...@apache.org>.
Please see my answers inline.

-Max

On 10.03.20 05:02, Eleanore Jin wrote:
> Hi Max, 
> 
> Thanks for the response! the reason to setup the state backend is to
> experiment Kafka EOS with Beam running on Flink.  Reading through the
> code and this PR <https://github.com/apache/beam/pull/7991/files>, can
> you please help me clarify my understanding?
> 
> 1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
> EOS, ExactlyOnceWriter processElement method is annotated
> with @RequiresStableInput, so all the messages will be cached
> by KeyedBufferingElementsHandler, only after checkpoint succeeds, those
> messages will be processed by ExactlyOnceWriter?

That's correct.

> 
> 2. Upon checkpoint, will those messages cached by
> KeyedBufferingEleementsHandler also checkpointed?

Yes, the buffered elements will be checkpointed.

> 3. It seems the way Beam provides Kafka EOS will introduce delays in the
> stream processing, the delay is based on the checkpoint interval? How to
> reduce the latency while still have EOS guarantee?

Indeed, the checkpoint interval and the checkpoint duration limits the
latency. Given the current design and the guarantees, there is no other
way to influence the latency.

> 4. commitOffsetsInFinalize is also enabled, does this mean, upon
> checkpoint successfully, the checkpointed offset will be committed back
> to kafka, but if this operation does not finish successfully, and then
> the job gets cancelled/stopped, and re-submit the job again (with the
> same consumer group for source topics, but different jobID), then it is
> possible duplicated processing still exists? because the consumed offset
> is not committed back to kafka?

This option is for the Kafka consumer. AFAIK this is just a convenience
method to commit the latest checkpointed offset to Kafka. This offset is
not used when restoring from a checkpoint. However, if you don't restore
from a checkpoint, you can resume from that offset which might be
convenient or not, depending on your use case.

> 
> Thanks a lot!
> Eleanore
> 
> On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <mxm@apache.org
> <ma...@apache.org>> wrote:
> 
>     Hi Eleanore,
> 
>     Good question. I think the easiest way is to configure this in the
>     Flink
>     configuration file, i.e. flink-conf.yaml. Then you don't need to set
>     anything in Beam.
> 
>     If you want to go with your approach, then just use
>     getClass().getClassLoader() unless you have some custom classloader for
>     loading your state backend.
> 
>     Cheers,
>     Max
> 
>     On 04.03.20 01:39, Jin Yi wrote:
>     > Hi Experts,
>     >
>     > I am running Beam application with Flink Runner. I would like to set
>     > State Backend to be FsStateBackend instead of MemoryStateBackend.
>     >
>     > in FlinkPipelineOptions.java
>     >
>     > I should be able to call setStateBackendFactory(), but I did not find
>     > any provided implementations for FlinkStateBackendFactory
>     interface, so
>     > that means I have to implement my own?
>     >
>     > Thanks a lot!
>     > Eleanore
>     >
>     > /**
>     > * State backend to store Beam's state during computation. Note: Only
>     > applicable when executing in
>     > * streaming mode.
>     > */
>     > @Description(
>     >      "Sets the state backend factory to use in streaming mode. "
>     > +"Defaults to the flink cluster's state.backend configuration.")
>     > Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
>     >
>     > void setStateBackendFactory(Class<?extends
>     FlinkStateBackendFactory> stateBackendFactory);
>     >
> 

Re: FlinkStateBackendFactory

Posted by Eleanore Jin <el...@gmail.com>.
Hi Max,

Thanks for the response! the reason to setup the state backend is to
experiment Kafka EOS with Beam running on Flink.  Reading through the code
and this PR <https://github.com/apache/beam/pull/7991/files>, can you
please help me clarify my understanding?

1. Beam uses KafkaExactlyOnceSink to publish messages to achieve
EOS, ExactlyOnceWriter processElement method is annotated
with @RequiresStableInput, so all the messages will be cached by
KeyedBufferingElementsHandler,
only after checkpoint succeeds, those messages will be processed by
ExactlyOnceWriter?

2. Upon checkpoint, will those messages cached by
KeyedBufferingEleementsHandler also checkpointed?

3. It seems the way Beam provides Kafka EOS will introduce delays in the
stream processing, the delay is based on the checkpoint interval? How to
reduce the latency while still have EOS guarantee?

4. commitOffsetsInFinalize is also enabled, does this mean, upon checkpoint
successfully, the checkpointed offset will be committed back to kafka, but
if this operation does not finish successfully, and then the job gets
cancelled/stopped, and re-submit the job again (with the same consumer
group for source topics, but different jobID), then it is possible
duplicated processing still exists? because the consumed offset is not
committed back to kafka?

Thanks a lot!
Eleanore

On Thu, Mar 5, 2020 at 12:46 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Eleanore,
>
> Good question. I think the easiest way is to configure this in the Flink
> configuration file, i.e. flink-conf.yaml. Then you don't need to set
> anything in Beam.
>
> If you want to go with your approach, then just use
> getClass().getClassLoader() unless you have some custom classloader for
> loading your state backend.
>
> Cheers,
> Max
>
> On 04.03.20 01:39, Jin Yi wrote:
> > Hi Experts,
> >
> > I am running Beam application with Flink Runner. I would like to set
> > State Backend to be FsStateBackend instead of MemoryStateBackend.
> >
> > in FlinkPipelineOptions.java
> >
> > I should be able to call setStateBackendFactory(), but I did not find
> > any provided implementations for FlinkStateBackendFactory interface, so
> > that means I have to implement my own?
> >
> > Thanks a lot!
> > Eleanore
> >
> > /**
> > * State backend to store Beam's state during computation. Note: Only
> > applicable when executing in
> > * streaming mode.
> > */
> > @Description(
> >      "Sets the state backend factory to use in streaming mode. "
> > +"Defaults to the flink cluster's state.backend configuration.")
> > Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
> >
> > void setStateBackendFactory(Class<?extends FlinkStateBackendFactory>
> stateBackendFactory);
> >
>

Re: FlinkStateBackendFactory

Posted by Maximilian Michels <mx...@apache.org>.
Hi Eleanore,

Good question. I think the easiest way is to configure this in the Flink 
configuration file, i.e. flink-conf.yaml. Then you don't need to set 
anything in Beam.

If you want to go with your approach, then just use 
getClass().getClassLoader() unless you have some custom classloader for 
loading your state backend.

Cheers,
Max

On 04.03.20 01:39, Jin Yi wrote:
> Hi Experts,
> 
> I am running Beam application with Flink Runner. I would like to set 
> State Backend to be FsStateBackend instead of MemoryStateBackend.
> 
> in FlinkPipelineOptions.java
> 
> I should be able to call setStateBackendFactory(), but I did not find 
> any provided implementations for FlinkStateBackendFactory interface, so 
> that means I have to implement my own?
> 
> Thanks a lot!
> Eleanore
> 
> /**
> * State backend to store Beam's state during computation. Note: Only 
> applicable when executing in
> * streaming mode.
> */
> @Description(
>      "Sets the state backend factory to use in streaming mode. "
> +"Defaults to the flink cluster's state.backend configuration.")
> Class<?extends FlinkStateBackendFactory> getStateBackendFactory();
> 
> void setStateBackendFactory(Class<?extends FlinkStateBackendFactory> stateBackendFactory);
>