You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Christian Kreutzfeldt <mn...@gmail.com> on 2016/01/13 10:29:59 UTC

Accessing configuration in RichFunction

Hi

While working on a RichFilterFunction implementation I was wondering, if
there is a much better way to access configuration
options read from file during startup. Actually, I am
using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
to get access to my settings.

Reason for that is, that the Configuration parameter provided to the open
function does not carry my settings. That is probably
the case as I
use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
pass my configuration into the environment
which in turn is not passed on as part of the open call - I found no other
way to handle configuration ;-)

My question is: who is responsible for calling the open function, where
does the configuration parameter has its origins aka where
is its content taken from and is it possible to define somewhere in the
main program which configuration to pass into a specific operator?

Best
  Christian

Re: Accessing configuration in RichFunction

Posted by Christian Kreutzfeldt <mn...@gmail.com>.
Hi Robert,

using the constructor is actually the selected way. Using the existing
lifecycle method was an idea to integrate it more with the existing
framework design ;-)

Best
  Christian

2016-01-18 13:38 GMT+01:00 Robert Metzger <rm...@apache.org>:

> Hi Christian,
>
> I think the DataStream API does not allow you to pass any parameter to the
> open(Configuration) method.
> That method is only used in the DataSet (Batch) API, and its use is
> discouraged.
>
> A much better option to pass a Configuration into your function is as
> follows:
>
>
> Configuration mapConf = new Configuration();
> mapConf.setDouble("somthing", 1.2);
>
> DataStream<Tuple2<String, Integer>> counts =
> // split up the lines in pairs (2-tuples) containing: (word,1)
>
> text.flatMap(new Tokenizer(mapConf))
> // group by the tuple field "0" and sum up tuple field "1"
>       .keyBy(0).sum(1);
>
>
> And in the Tokenizer:
>
> public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
>    private final Configuration mapConf;
>
>    public Tokenizer(Configuration mapConf) {
>       this.mapConf = mapConf;
>    }
>
>
> This works as long as the type you're passing is serializable.
>
>
>
> On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <mn...@gmail.com>
> wrote:
>
>> Hi Max,
>>
>> maybe I explained it a bit mistakable ;-)
>>
>> I have a stream-based application which contains a RichFilterFunction
>> implementation. The parent provides a lifecycle method open
>> (open(Configuration)) which receives a Configuration object as input. I
>> would like to use this call to pass options into the operator instance.
>>
>> Unfortunately, I found no hint where and how to provide the information
>> such that I receive them at the described method. Actually, I am accessing
>> the surrounding runtime context to retrieve the global job parameters where
>> I extract the desired information from. But for some reasons I do not want
>> the operator to receive its setup information from the provided
>> Configuration instance ;-)
>>
>> That's why I am looking for the place where the configuration object is
>> created and passed into the rich filter function. I would like to insert
>> dedicated information for a dedicated filter instance.
>>
>> Best
>>   Christian
>>
>>
>> 2016-01-18 12:30 GMT+01:00 Maximilian Michels <mx...@apache.org>:
>>
>>> Hi Christian,
>>>
>>> For your implementation, would it suffice to pass a Configuration with
>>> your RichFilterFunction? You said the global job parameters are not
>>> passed on to your user function? Can you confirm this is a bug?
>>>
>>> Cheers,
>>> Max
>>>
>>> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
>>> <mn...@gmail.com> wrote:
>>> > Hi Fabian,
>>> >
>>> > thanks for your quick response. I just figured out that I forgot to
>>> mention
>>> > a small but probably relevant detail: I am working with the streaming
>>> api.
>>> >
>>> > Although there is a way to access the overall job settings, I need a
>>> > solution to "reduce" the view on configuration options available on
>>> operator
>>> > level.
>>> > For example, I would like to pass instance specific settings like an
>>> > operator identifier but there might be different operators in the
>>> overall
>>> > program.
>>> >
>>> > Best
>>> >   Christian
>>> >
>>> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>> >>
>>> >> Hi Christian,
>>> >>
>>> >> the open method is called by the Flink workers when the parallel
>>> tasks are
>>> >> initialized.
>>> >> The configuration parameter is the configuration object of the
>>> operator.
>>> >> You can set parameters in the operator config as follows:
>>> >>
>>> >> DataSet<String> text = ...
>>> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>>> >> Tokenizer()).getParameters().setString("myKey", "myVal");
>>> >>
>>> >> Best, Fabian
>>> >>
>>> >>
>>> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:
>>> >>>
>>> >>> Hi
>>> >>>
>>> >>> While working on a RichFilterFunction implementation I was
>>> wondering, if
>>> >>> there is a much better way to access configuration
>>> >>> options read from file during startup. Actually, I am using
>>> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> >>> to get access to my settings.
>>> >>>
>>> >>> Reason for that is, that the Configuration parameter provided to the
>>> open
>>> >>> function does not carry my settings. That is probably
>>> >>> the case as I use
>>> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
>>> pass my
>>> >>> configuration into the environment
>>> >>> which in turn is not passed on as part of the open call - I found no
>>> >>> other way to handle configuration ;-)
>>> >>>
>>> >>> My question is: who is responsible for calling the open function,
>>> where
>>> >>> does the configuration parameter has its origins aka where
>>> >>> is its content taken from and is it possible to define somewhere in
>>> the
>>> >>> main program which configuration to pass into a specific operator?
>>> >>>
>>> >>> Best
>>> >>>   Christian
>>> >>
>>> >>
>>> >
>>>
>>
>>
>

Re: Accessing configuration in RichFunction

Posted by Robert Metzger <rm...@apache.org>.
Hi Christian,

I think the DataStream API does not allow you to pass any parameter to the
open(Configuration) method.
That method is only used in the DataSet (Batch) API, and its use is
discouraged.

A much better option to pass a Configuration into your function is as
follows:


Configuration mapConf = new Configuration();
mapConf.setDouble("somthing", 1.2);

DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)

text.flatMap(new Tokenizer(mapConf))
// group by the tuple field "0" and sum up tuple field "1"
      .keyBy(0).sum(1);


And in the Tokenizer:

public static final class Tokenizer implements FlatMapFunction<String,
Tuple2<String, Integer>> {
   private final Configuration mapConf;

   public Tokenizer(Configuration mapConf) {
      this.mapConf = mapConf;
   }


This works as long as the type you're passing is serializable.



On Mon, Jan 18, 2016 at 12:43 PM, Christian Kreutzfeldt <mn...@gmail.com>
wrote:

> Hi Max,
>
> maybe I explained it a bit mistakable ;-)
>
> I have a stream-based application which contains a RichFilterFunction
> implementation. The parent provides a lifecycle method open
> (open(Configuration)) which receives a Configuration object as input. I
> would like to use this call to pass options into the operator instance.
>
> Unfortunately, I found no hint where and how to provide the information
> such that I receive them at the described method. Actually, I am accessing
> the surrounding runtime context to retrieve the global job parameters where
> I extract the desired information from. But for some reasons I do not want
> the operator to receive its setup information from the provided
> Configuration instance ;-)
>
> That's why I am looking for the place where the configuration object is
> created and passed into the rich filter function. I would like to insert
> dedicated information for a dedicated filter instance.
>
> Best
>   Christian
>
>
> 2016-01-18 12:30 GMT+01:00 Maximilian Michels <mx...@apache.org>:
>
>> Hi Christian,
>>
>> For your implementation, would it suffice to pass a Configuration with
>> your RichFilterFunction? You said the global job parameters are not
>> passed on to your user function? Can you confirm this is a bug?
>>
>> Cheers,
>> Max
>>
>> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
>> <mn...@gmail.com> wrote:
>> > Hi Fabian,
>> >
>> > thanks for your quick response. I just figured out that I forgot to
>> mention
>> > a small but probably relevant detail: I am working with the streaming
>> api.
>> >
>> > Although there is a way to access the overall job settings, I need a
>> > solution to "reduce" the view on configuration options available on
>> operator
>> > level.
>> > For example, I would like to pass instance specific settings like an
>> > operator identifier but there might be different operators in the
>> overall
>> > program.
>> >
>> > Best
>> >   Christian
>> >
>> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>> >>
>> >> Hi Christian,
>> >>
>> >> the open method is called by the Flink workers when the parallel tasks
>> are
>> >> initialized.
>> >> The configuration parameter is the configuration object of the
>> operator.
>> >> You can set parameters in the operator config as follows:
>> >>
>> >> DataSet<String> text = ...
>> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> >> Tokenizer()).getParameters().setString("myKey", "myVal");
>> >>
>> >> Best, Fabian
>> >>
>> >>
>> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:
>> >>>
>> >>> Hi
>> >>>
>> >>> While working on a RichFilterFunction implementation I was wondering,
>> if
>> >>> there is a much better way to access configuration
>> >>> options read from file during startup. Actually, I am using
>> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>> >>> to get access to my settings.
>> >>>
>> >>> Reason for that is, that the Configuration parameter provided to the
>> open
>> >>> function does not carry my settings. That is probably
>> >>> the case as I use
>> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
>> pass my
>> >>> configuration into the environment
>> >>> which in turn is not passed on as part of the open call - I found no
>> >>> other way to handle configuration ;-)
>> >>>
>> >>> My question is: who is responsible for calling the open function,
>> where
>> >>> does the configuration parameter has its origins aka where
>> >>> is its content taken from and is it possible to define somewhere in
>> the
>> >>> main program which configuration to pass into a specific operator?
>> >>>
>> >>> Best
>> >>>   Christian
>> >>
>> >>
>> >
>>
>
>

Re: Accessing configuration in RichFunction

Posted by Christian Kreutzfeldt <mn...@gmail.com>.
Hi Max,

maybe I explained it a bit mistakable ;-)

I have a stream-based application which contains a RichFilterFunction
implementation. The parent provides a lifecycle method open
(open(Configuration)) which receives a Configuration object as input. I
would like to use this call to pass options into the operator instance.

Unfortunately, I found no hint where and how to provide the information
such that I receive them at the described method. Actually, I am accessing
the surrounding runtime context to retrieve the global job parameters where
I extract the desired information from. But for some reasons I do not want
the operator to receive its setup information from the provided
Configuration instance ;-)

That's why I am looking for the place where the configuration object is
created and passed into the rich filter function. I would like to insert
dedicated information for a dedicated filter instance.

Best
  Christian


2016-01-18 12:30 GMT+01:00 Maximilian Michels <mx...@apache.org>:

> Hi Christian,
>
> For your implementation, would it suffice to pass a Configuration with
> your RichFilterFunction? You said the global job parameters are not
> passed on to your user function? Can you confirm this is a bug?
>
> Cheers,
> Max
>
> On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
> <mn...@gmail.com> wrote:
> > Hi Fabian,
> >
> > thanks for your quick response. I just figured out that I forgot to
> mention
> > a small but probably relevant detail: I am working with the streaming
> api.
> >
> > Although there is a way to access the overall job settings, I need a
> > solution to "reduce" the view on configuration options available on
> operator
> > level.
> > For example, I would like to pass instance specific settings like an
> > operator identifier but there might be different operators in the overall
> > program.
> >
> > Best
> >   Christian
> >
> > 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
> >>
> >> Hi Christian,
> >>
> >> the open method is called by the Flink workers when the parallel tasks
> are
> >> initialized.
> >> The configuration parameter is the configuration object of the operator.
> >> You can set parameters in the operator config as follows:
> >>
> >> DataSet<String> text = ...
> >> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
> >> Tokenizer()).getParameters().setString("myKey", "myVal");
> >>
> >> Best, Fabian
> >>
> >>
> >> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:
> >>>
> >>> Hi
> >>>
> >>> While working on a RichFilterFunction implementation I was wondering,
> if
> >>> there is a much better way to access configuration
> >>> options read from file during startup. Actually, I am using
> >>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
> >>> to get access to my settings.
> >>>
> >>> Reason for that is, that the Configuration parameter provided to the
> open
> >>> function does not carry my settings. That is probably
> >>> the case as I use
> >>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
> pass my
> >>> configuration into the environment
> >>> which in turn is not passed on as part of the open call - I found no
> >>> other way to handle configuration ;-)
> >>>
> >>> My question is: who is responsible for calling the open function, where
> >>> does the configuration parameter has its origins aka where
> >>> is its content taken from and is it possible to define somewhere in the
> >>> main program which configuration to pass into a specific operator?
> >>>
> >>> Best
> >>>   Christian
> >>
> >>
> >
>

Re: Accessing configuration in RichFunction

Posted by Maximilian Michels <mx...@apache.org>.
Hi Christian,

For your implementation, would it suffice to pass a Configuration with
your RichFilterFunction? You said the global job parameters are not
passed on to your user function? Can you confirm this is a bug?

Cheers,
Max

On Wed, Jan 13, 2016 at 10:59 AM, Christian Kreutzfeldt
<mn...@gmail.com> wrote:
> Hi Fabian,
>
> thanks for your quick response. I just figured out that I forgot to mention
> a small but probably relevant detail: I am working with the streaming api.
>
> Although there is a way to access the overall job settings, I need a
> solution to "reduce" the view on configuration options available on operator
> level.
> For example, I would like to pass instance specific settings like an
> operator identifier but there might be different operators in the overall
> program.
>
> Best
>   Christian
>
> 2016-01-13 10:52 GMT+01:00 Fabian Hueske <fh...@gmail.com>:
>>
>> Hi Christian,
>>
>> the open method is called by the Flink workers when the parallel tasks are
>> initialized.
>> The configuration parameter is the configuration object of the operator.
>> You can set parameters in the operator config as follows:
>>
>> DataSet<String> text = ...
>> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
>> Tokenizer()).getParameters().setString("myKey", "myVal");
>>
>> Best, Fabian
>>
>>
>> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:
>>>
>>> Hi
>>>
>>> While working on a RichFilterFunction implementation I was wondering, if
>>> there is a much better way to access configuration
>>> options read from file during startup. Actually, I am using
>>> getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>>> to get access to my settings.
>>>
>>> Reason for that is, that the Configuration parameter provided to the open
>>> function does not carry my settings. That is probably
>>> the case as I use
>>> this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to pass my
>>> configuration into the environment
>>> which in turn is not passed on as part of the open call - I found no
>>> other way to handle configuration ;-)
>>>
>>> My question is: who is responsible for calling the open function, where
>>> does the configuration parameter has its origins aka where
>>> is its content taken from and is it possible to define somewhere in the
>>> main program which configuration to pass into a specific operator?
>>>
>>> Best
>>>   Christian
>>
>>
>

Re: Accessing configuration in RichFunction

Posted by Christian Kreutzfeldt <mn...@gmail.com>.
Hi Fabian,

thanks for your quick response. I just figured out that I forgot to mention
a small but probably relevant detail: I am working with the streaming api.

Although there is a way to access the overall job settings, I need a
solution to "reduce" the view on configuration options available on
operator level.
For example, I would like to pass instance specific settings like an
operator identifier but there might be different operators in the overall
program.

Best
  Christian

2016-01-13 10:52 GMT+01:00 Fabian Hueske <fh...@gmail.com>:

> Hi Christian,
>
> the open method is called by the Flink workers when the parallel tasks are
> initialized.
> The configuration parameter is the configuration object of the operator.
> You can set parameters in the operator config as follows:
>
> DataSet<String> text = ...
> DataSet<Tuple2<String, Integer> wc = text.flatMap(new
> Tokenizer()).getParameters().setString("myKey", "myVal");
>
> Best, Fabian
>
>
> 2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:
>
>> Hi
>>
>> While working on a RichFilterFunction implementation I was wondering, if
>> there is a much better way to access configuration
>> options read from file during startup. Actually, I am
>> using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
>> to get access to my settings.
>>
>> Reason for that is, that the Configuration parameter provided to the open
>> function does not carry my settings. That is probably
>> the case as I
>> use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
>> pass my configuration into the environment
>> which in turn is not passed on as part of the open call - I found no
>> other way to handle configuration ;-)
>>
>> My question is: who is responsible for calling the open function, where
>> does the configuration parameter has its origins aka where
>> is its content taken from and is it possible to define somewhere in the
>> main program which configuration to pass into a specific operator?
>>
>> Best
>>   Christian
>>
>
>

Re: Accessing configuration in RichFunction

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Christian,

the open method is called by the Flink workers when the parallel tasks are
initialized.
The configuration parameter is the configuration object of the operator.
You can set parameters in the operator config as follows:

DataSet<String> text = ...
DataSet<Tuple2<String, Integer> wc = text.flatMap(new
Tokenizer()).getParameters().setString("myKey", "myVal");

Best, Fabian


2016-01-13 10:29 GMT+01:00 Christian Kreutzfeldt <mn...@gmail.com>:

> Hi
>
> While working on a RichFilterFunction implementation I was wondering, if
> there is a much better way to access configuration
> options read from file during startup. Actually, I am
> using getRuntimeContext().getExecutionConfig().getGlobalJobParameters()
> to get access to my settings.
>
> Reason for that is, that the Configuration parameter provided to the open
> function does not carry my settings. That is probably
> the case as I
> use this.executionEnvironment.getConfig().setGlobalJobParameters(cfg) to
> pass my configuration into the environment
> which in turn is not passed on as part of the open call - I found no other
> way to handle configuration ;-)
>
> My question is: who is responsible for calling the open function, where
> does the configuration parameter has its origins aka where
> is its content taken from and is it possible to define somewhere in the
> main program which configuration to pass into a specific operator?
>
> Best
>   Christian
>