You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Catlyn Kong <ca...@yelp.com> on 2019/08/23 00:57:49 UTC

--state_backend PipelineOption not supported in python when running on Flink

Hi all,

I'm experimenting with checkpoints/savepoints in Beam (version 2.14) when
using a Flink (version 1.6.4) runner. Flink was able to take periodic
checkpoints when I setup the flink-conf.yaml correctly. But I was thinking
if it's possible to set the StateBackend on a per job level by flagging the
--state_backend=RocksDBStateBackend option since it's said to be supported
here <https://beam.apache.org/documentation/runners/flink/>.

But instead I got the following error:
RuntimeError: Pipeline failed in state FAILED:
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot
construct instance of `org.apache.flink.runtime.state.StateBackend` (no
Creators, like default construct, exist): abstract types either need to be
mapped to concrete types, have custom deserializer, or contain additional
type information
 at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]

I then saw that there's:
@JasonIgnore
StateBackend getStateBackend();

I'm wondering if this is not supported in python yet? If yes then do we
have plans to support this in the near future?

Best,
Catlyn

Re: [External] Re: --state_backend PipelineOption not supported in python when running on Flink

Posted by Catlyn Kong <ca...@yelp.com>.
Hi Max,

Filed https://issues.apache.org/jira/browse/BEAM-8112 to track this. It'll
be nice to also point it out in the python documentation here that it's not
supported yet: https://beam.apache.org/documentation/runners/flink/.

Thanks,
Catlyn

On Wed, Aug 28, 2019 at 2:50 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Catlyn,
>
> I granted you contributor permissions in JIRA which allows you to
> create/assign tickets.
>
> Cheers,
> Max
>
> On 27.08.19 19:54, Catlyn Kong wrote:
> > Hi Max,
> >
> > Thanks for getting back to me. Will create a ticket, my username is
> catlynk.
> >
> > Cheers,
> > Catlyn
> >
> > On Tue, Aug 27, 2019 at 3:50 AM Maximilian Michels <mxm@apache.org
> > <ma...@apache.org>> wrote:
> >
> >     Hi Catlyn,
> >
> >     This option has never worked outside the Java SDK where it originates
> >     from. For the upcoming Beam 2.16.0 release, we have replaced this
> >     option
> >     with a factory class:
> >
> https://github.com/apache/beam/blob/da6c1a8f435f5583811785050808a2311db94047/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L152
> >
> >     We could bundle a default factory class for the RocksDB state backend
> >     and make this easier to configure from Python. Do you mind opening a
> >     JIRA issue for this? I can give you permissions for
> >     https://issues.apache.org/jira/projects/BEAM/issues if you create an
> >     account.
> >
> >     In any case, you could also configure the Flink cluster to use
> RocksDB.
> >     Beam will use whatever state backend is configured.
> >
> >     Thanks,
> >     Max
> >
> >     On 23.08.19 02:57, Catlyn Kong wrote:
> >      > Hi all,
> >      >
> >      > I'm experimenting with checkpoints/savepoints in Beam (version
> 2.14)
> >      > when using a Flink (version 1.6.4) runner. Flink was able to take
> >      > periodic checkpoints when I setup the flink-conf.yaml correctly.
> >     But I
> >      > was thinking if it's possible to set the StateBackend on a per
> >     job level
> >      > by flagging the --state_backend=RocksDBStateBackend option since
> it's
> >      > said to be supported here
> >      > <https://beam.apache.org/documentation/runners/flink/>.
> >      >
> >      > But instead I got the following error:
> >      > RuntimeError: Pipeline failed in state FAILED:
> >      > com.fasterxml.jackson.databind.exc.InvalidDefinitionException:
> Cannot
> >      > construct instance of
> >     `org.apache.flink.runtime.state.StateBackend` (no
> >      > Creators, like default construct, exist): abstract types either
> >     need to
> >      > be mapped to concrete types, have custom deserializer, or contain
> >      > additional type information
> >      >  at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]
> >      >
> >      > I then saw that there's:
> >      > @JasonIgnore
> >      > StateBackend getStateBackend();
> >      >
> >      > I'm wondering if this is not supported in python yet? If yes then
> >     do we
> >      > have plans to support this in the near future?
> >      >
> >      > Best,
> >      > Catlyn
> >
>

Re: [External] Re: --state_backend PipelineOption not supported in python when running on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Hi Catlyn,

I granted you contributor permissions in JIRA which allows you to 
create/assign tickets.

Cheers,
Max

On 27.08.19 19:54, Catlyn Kong wrote:
> Hi Max,
> 
> Thanks for getting back to me. Will create a ticket, my username is catlynk.
> 
> Cheers,
> Catlyn
> 
> On Tue, Aug 27, 2019 at 3:50 AM Maximilian Michels <mxm@apache.org 
> <ma...@apache.org>> wrote:
> 
>     Hi Catlyn,
> 
>     This option has never worked outside the Java SDK where it originates
>     from. For the upcoming Beam 2.16.0 release, we have replaced this
>     option
>     with a factory class:
>     https://github.com/apache/beam/blob/da6c1a8f435f5583811785050808a2311db94047/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L152
> 
>     We could bundle a default factory class for the RocksDB state backend
>     and make this easier to configure from Python. Do you mind opening a
>     JIRA issue for this? I can give you permissions for
>     https://issues.apache.org/jira/projects/BEAM/issues if you create an
>     account.
> 
>     In any case, you could also configure the Flink cluster to use RocksDB.
>     Beam will use whatever state backend is configured.
> 
>     Thanks,
>     Max
> 
>     On 23.08.19 02:57, Catlyn Kong wrote:
>      > Hi all,
>      >
>      > I'm experimenting with checkpoints/savepoints in Beam (version 2.14)
>      > when using a Flink (version 1.6.4) runner. Flink was able to take
>      > periodic checkpoints when I setup the flink-conf.yaml correctly.
>     But I
>      > was thinking if it's possible to set the StateBackend on a per
>     job level
>      > by flagging the --state_backend=RocksDBStateBackend option since it's
>      > said to be supported here
>      > <https://beam.apache.org/documentation/runners/flink/>.
>      >
>      > But instead I got the following error:
>      > RuntimeError: Pipeline failed in state FAILED:
>      > com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot
>      > construct instance of
>     `org.apache.flink.runtime.state.StateBackend` (no
>      > Creators, like default construct, exist): abstract types either
>     need to
>      > be mapped to concrete types, have custom deserializer, or contain
>      > additional type information
>      >  at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]
>      >
>      > I then saw that there's:
>      > @JasonIgnore
>      > StateBackend getStateBackend();
>      >
>      > I'm wondering if this is not supported in python yet? If yes then
>     do we
>      > have plans to support this in the near future?
>      >
>      > Best,
>      > Catlyn
> 

Re: [External] Re: --state_backend PipelineOption not supported in python when running on Flink

Posted by Catlyn Kong <ca...@yelp.com>.
Hi Max,

Thanks for getting back to me. Will create a ticket, my username is catlynk.

Cheers,
Catlyn

On Tue, Aug 27, 2019 at 3:50 AM Maximilian Michels <mx...@apache.org> wrote:

> Hi Catlyn,
>
> This option has never worked outside the Java SDK where it originates
> from. For the upcoming Beam 2.16.0 release, we have replaced this option
> with a factory class:
>
> https://github.com/apache/beam/blob/da6c1a8f435f5583811785050808a2311db94047/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L152
>
> We could bundle a default factory class for the RocksDB state backend
> and make this easier to configure from Python. Do you mind opening a
> JIRA issue for this? I can give you permissions for
> https://issues.apache.org/jira/projects/BEAM/issues if you create an
> account.
>
> In any case, you could also configure the Flink cluster to use RocksDB.
> Beam will use whatever state backend is configured.
>
> Thanks,
> Max
>
> On 23.08.19 02:57, Catlyn Kong wrote:
> > Hi all,
> >
> > I'm experimenting with checkpoints/savepoints in Beam (version 2.14)
> > when using a Flink (version 1.6.4) runner. Flink was able to take
> > periodic checkpoints when I setup the flink-conf.yaml correctly. But I
> > was thinking if it's possible to set the StateBackend on a per job level
> > by flagging the --state_backend=RocksDBStateBackend option since it's
> > said to be supported here
> > <https://beam.apache.org/documentation/runners/flink/>.
> >
> > But instead I got the following error:
> > RuntimeError: Pipeline failed in state FAILED:
> > com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot
> > construct instance of `org.apache.flink.runtime.state.StateBackend` (no
> > Creators, like default construct, exist): abstract types either need to
> > be mapped to concrete types, have custom deserializer, or contain
> > additional type information
> >  at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]
> >
> > I then saw that there's:
> > @JasonIgnore
> > StateBackend getStateBackend();
> >
> > I'm wondering if this is not supported in python yet? If yes then do we
> > have plans to support this in the near future?
> >
> > Best,
> > Catlyn
>

Re: --state_backend PipelineOption not supported in python when running on Flink

Posted by Maximilian Michels <mx...@apache.org>.
Hi Catlyn,

This option has never worked outside the Java SDK where it originates 
from. For the upcoming Beam 2.16.0 release, we have replaced this option 
with a factory class: 
https://github.com/apache/beam/blob/da6c1a8f435f5583811785050808a2311db94047/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java#L152

We could bundle a default factory class for the RocksDB state backend 
and make this easier to configure from Python. Do you mind opening a 
JIRA issue for this? I can give you permissions for 
https://issues.apache.org/jira/projects/BEAM/issues if you create an 
account.

In any case, you could also configure the Flink cluster to use RocksDB. 
Beam will use whatever state backend is configured.

Thanks,
Max

On 23.08.19 02:57, Catlyn Kong wrote:
> Hi all,
> 
> I'm experimenting with checkpoints/savepoints in Beam (version 2.14)
> when using a Flink (version 1.6.4) runner. Flink was able to take
> periodic checkpoints when I setup the flink-conf.yaml correctly. But I
> was thinking if it's possible to set the StateBackend on a per job level
> by flagging the --state_backend=RocksDBStateBackend option since it's
> said to be supported here
> <https://beam.apache.org/documentation/runners/flink/>.
> 
> But instead I got the following error:
> RuntimeError: Pipeline failed in state FAILED:
> com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot
> construct instance of `org.apache.flink.runtime.state.StateBackend` (no
> Creators, like default construct, exist): abstract types either need to
> be mapped to concrete types, have custom deserializer, or contain
> additional type information
>  at [Source: (String)""RocksDBStateBackend""; line: 1, column: 1]
> 
> I then saw that there's:
> @JasonIgnore
> StateBackend getStateBackend();
> 
> I'm wondering if this is not supported in python yet? If yes then do we
> have plans to support this in the near future?
> 
> Best,
> Catlyn