You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Stefano Bortoli <s....@gmail.com> on 2014/11/10 17:36:10 UTC

feature request: broadcast POJO objects as part of runtime context

Hi,

trying to run some legacy code as part of Flink Job, I had to replicate
configurations files across my cluster. Not a big deal with a small
cluster, but it would be nice to have these configuration objects
broadcast-able. Namely, it would be nice to reuse the old "read from conf
file" logic to build objects that then could be serialized and used along
the processing through the broadcast mechanism.

Do you think it will be possible? with the new Kryo serialization it should
not be extremely complicated.

saluti,
Stefano

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Stefano Bortoli <s....@gmail.com>.
Thanks a lot for the clarification. The point is that I don't deal with the
parameters directly in the map. These are just things I need to pass down
to the classes I use to implement the map logic reusing some code. For
example, to access a global index I need configurations for the Solr
client, and to implement the matching function, I need to read some other
parameters. We usually apply that logic in an application server, so the
environment is always there. However, I would like to be elastic in
allocating nodes for Flink without having to replicate the configuration in
all servers on which I will start a task manager.

I know I would have to re-implement all the initializations to use the
standard configuration parameter system. However, it would be nice just to
be able to broadcast some object to ease deployment and allocation of
servers. :-) Maybe we could just use zookeeper to make it easy. But again,
I would have to adapt all the configuration. Which I would not do if it was
not necessary.

Thanks a lot for your support and clarifications.

saluti,
Stefano

2014-11-10 21:11 GMT+01:00 Stephan Ewen <se...@apache.org>:

> Any type Flink supports as a data type.
> Am 10.11.2014 20:19 schrieb "Flavio Pompermaier" <po...@okkam.it>:
>
> But should MyType be serializable or can be of any type?
>> On Nov 10, 2014 8:06 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>>> You can put generic objects in the closure as well, just as you can put
>>> a configuration in the closure.
>>>
>>> You can also distribute your objects into the cluster and then use them
>>> as a broadcast variable:
>>>
>>> ------------------------
>>> DataSet<MyType> aux = env.fromElements(new MyType());
>>>
>>>
>>> someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast
>>> data");
>>>
>>> ------------------------
>>>
>>> See
>>> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#broadcast-variables
>>>
>>>
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <
>>> pompermaier@okkam.it> wrote:
>>>
>>>> I think Stefano was asking for a different way to pass a generic
>>>> Configuration object, not just a subclass of it.
>>>> For example, in our use case, it would be helpful to broadcast around
>>>> generic objects/pojos.
>>>> Is that possible?
>>>> On Nov 10, 2014 6:46 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>>>
>>>>> Hi!
>>>>>
>>>>> I hope I understand correctly what you are trying to do: To have a
>>>>> config file available in the functions, you can simply do wither of the
>>>>> following:
>>>>>
>>>>> -----------------------
>>>>> Closure
>>>>> -----------------------
>>>>>
>>>>> Configuration conf = ...
>>>>>
>>>>> data.map(new RichMapFunction<String, Integer>() {
>>>>>
>>>>>   public void open (Conficuration c) {
>>>>>      // access the conf object here
>>>>>      conf.getString(...);
>>>>>   }
>>>>>
>>>>>   public Integer map(String value) {
>>>>>     // whatever
>>>>>   }
>>>>> });
>>>>> -----------------------
>>>>>
>>>>>
>>>>> -----------------------
>>>>> Config Parameters
>>>>> -----------------------
>>>>>
>>>>> Configuration conf = ...
>>>>>
>>>>> data.map(new RichMapFunction<String, Integer>() {
>>>>>
>>>>>   public void open (Conficuration c) {
>>>>>      // access the c - it will will have all elements of the conf -
>>>>> see withParameters() below
>>>>>      c.getString(...);
>>>>>   }
>>>>>
>>>>>   public Integer map(String value) {
>>>>>     // whatever
>>>>>   }
>>>>> })
>>>>> .withParameters(conf);
>>>>> -----------------------
>>>>>
>>>>>
>>>>> Stephan
>>>>>
>>>>>
>>>>> On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> trying to run some legacy code as part of Flink Job, I had to
>>>>>> replicate configurations files across my cluster. Not a big deal with a
>>>>>> small cluster, but it would be nice to have these configuration objects
>>>>>> broadcast-able. Namely, it would be nice to reuse the old "read from conf
>>>>>> file" logic to build objects that then could be serialized and used along
>>>>>> the processing through the broadcast mechanism.
>>>>>>
>>>>>> Do you think it will be possible? with the new Kryo serialization it
>>>>>> should not be extremely complicated.
>>>>>>
>>>>>> saluti,
>>>>>> Stefano
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Stephan Ewen <se...@apache.org>.
Any type Flink supports as a data type.
Am 10.11.2014 20:19 schrieb "Flavio Pompermaier" <po...@okkam.it>:

> But should MyType be serializable or can be of any type?
> On Nov 10, 2014 8:06 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
>> You can put generic objects in the closure as well, just as you can put a
>> configuration in the closure.
>>
>> You can also distribute your objects into the cluster and then use them
>> as a broadcast variable:
>>
>> ------------------------
>> DataSet<MyType> aux = env.fromElements(new MyType());
>>
>>
>> someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast
>> data");
>>
>> ------------------------
>>
>> See
>> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#broadcast-variables
>>
>>
>>
>> Stephan
>>
>>
>> On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <pompermaier@okkam.it
>> > wrote:
>>
>>> I think Stefano was asking for a different way to pass a generic
>>> Configuration object, not just a subclass of it.
>>> For example, in our use case, it would be helpful to broadcast around
>>> generic objects/pojos.
>>> Is that possible?
>>> On Nov 10, 2014 6:46 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>>
>>>> Hi!
>>>>
>>>> I hope I understand correctly what you are trying to do: To have a
>>>> config file available in the functions, you can simply do wither of the
>>>> following:
>>>>
>>>> -----------------------
>>>> Closure
>>>> -----------------------
>>>>
>>>> Configuration conf = ...
>>>>
>>>> data.map(new RichMapFunction<String, Integer>() {
>>>>
>>>>   public void open (Conficuration c) {
>>>>      // access the conf object here
>>>>      conf.getString(...);
>>>>   }
>>>>
>>>>   public Integer map(String value) {
>>>>     // whatever
>>>>   }
>>>> });
>>>> -----------------------
>>>>
>>>>
>>>> -----------------------
>>>> Config Parameters
>>>> -----------------------
>>>>
>>>> Configuration conf = ...
>>>>
>>>> data.map(new RichMapFunction<String, Integer>() {
>>>>
>>>>   public void open (Conficuration c) {
>>>>      // access the c - it will will have all elements of the conf - see
>>>> withParameters() below
>>>>      c.getString(...);
>>>>   }
>>>>
>>>>   public Integer map(String value) {
>>>>     // whatever
>>>>   }
>>>> })
>>>> .withParameters(conf);
>>>> -----------------------
>>>>
>>>>
>>>> Stephan
>>>>
>>>>
>>>> On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> trying to run some legacy code as part of Flink Job, I had to
>>>>> replicate configurations files across my cluster. Not a big deal with a
>>>>> small cluster, but it would be nice to have these configuration objects
>>>>> broadcast-able. Namely, it would be nice to reuse the old "read from conf
>>>>> file" logic to build objects that then could be serialized and used along
>>>>> the processing through the broadcast mechanism.
>>>>>
>>>>> Do you think it will be possible? with the new Kryo serialization it
>>>>> should not be extremely complicated.
>>>>>
>>>>> saluti,
>>>>> Stefano
>>>>>
>>>>>
>>>>>
>>>>
>>

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Flavio Pompermaier <po...@okkam.it>.
But should MyType be serializable or can be of any type?
On Nov 10, 2014 8:06 PM, "Stephan Ewen" <se...@apache.org> wrote:

> You can put generic objects in the closure as well, just as you can put a
> configuration in the closure.
>
> You can also distribute your objects into the cluster and then use them as
> a broadcast variable:
>
> ------------------------
> DataSet<MyType> aux = env.fromElements(new MyType());
>
>
> someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast
> data");
>
> ------------------------
>
> See
> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#broadcast-variables
>
>
>
> Stephan
>
>
> On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <po...@okkam.it>
> wrote:
>
>> I think Stefano was asking for a different way to pass a generic
>> Configuration object, not just a subclass of it.
>> For example, in our use case, it would be helpful to broadcast around
>> generic objects/pojos.
>> Is that possible?
>> On Nov 10, 2014 6:46 PM, "Stephan Ewen" <se...@apache.org> wrote:
>>
>>> Hi!
>>>
>>> I hope I understand correctly what you are trying to do: To have a
>>> config file available in the functions, you can simply do wither of the
>>> following:
>>>
>>> -----------------------
>>> Closure
>>> -----------------------
>>>
>>> Configuration conf = ...
>>>
>>> data.map(new RichMapFunction<String, Integer>() {
>>>
>>>   public void open (Conficuration c) {
>>>      // access the conf object here
>>>      conf.getString(...);
>>>   }
>>>
>>>   public Integer map(String value) {
>>>     // whatever
>>>   }
>>> });
>>> -----------------------
>>>
>>>
>>> -----------------------
>>> Config Parameters
>>> -----------------------
>>>
>>> Configuration conf = ...
>>>
>>> data.map(new RichMapFunction<String, Integer>() {
>>>
>>>   public void open (Conficuration c) {
>>>      // access the c - it will will have all elements of the conf - see
>>> withParameters() below
>>>      c.getString(...);
>>>   }
>>>
>>>   public Integer map(String value) {
>>>     // whatever
>>>   }
>>> })
>>> .withParameters(conf);
>>> -----------------------
>>>
>>>
>>> Stephan
>>>
>>>
>>> On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> trying to run some legacy code as part of Flink Job, I had to replicate
>>>> configurations files across my cluster. Not a big deal with a small
>>>> cluster, but it would be nice to have these configuration objects
>>>> broadcast-able. Namely, it would be nice to reuse the old "read from conf
>>>> file" logic to build objects that then could be serialized and used along
>>>> the processing through the broadcast mechanism.
>>>>
>>>> Do you think it will be possible? with the new Kryo serialization it
>>>> should not be extremely complicated.
>>>>
>>>> saluti,
>>>> Stefano
>>>>
>>>>
>>>>
>>>
>

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Stephan Ewen <se...@apache.org>.
You can put generic objects in the closure as well, just as you can put a
configuration in the closure.

You can also distribute your objects into the cluster and then use them as
a broadcast variable:

------------------------
DataSet<MyType> aux = env.fromElements(new MyType());


someOtherData.map(new MyMapper()).withBroadcastSet(aux, "my broadcast
data");

------------------------

See
http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#broadcast-variables



Stephan


On Mon, Nov 10, 2014 at 7:32 PM, Flavio Pompermaier <po...@okkam.it>
wrote:

> I think Stefano was asking for a different way to pass a generic
> Configuration object, not just a subclass of it.
> For example, in our use case, it would be helpful to broadcast around
> generic objects/pojos.
> Is that possible?
> On Nov 10, 2014 6:46 PM, "Stephan Ewen" <se...@apache.org> wrote:
>
>> Hi!
>>
>> I hope I understand correctly what you are trying to do: To have a config
>> file available in the functions, you can simply do wither of the following:
>>
>> -----------------------
>> Closure
>> -----------------------
>>
>> Configuration conf = ...
>>
>> data.map(new RichMapFunction<String, Integer>() {
>>
>>   public void open (Conficuration c) {
>>      // access the conf object here
>>      conf.getString(...);
>>   }
>>
>>   public Integer map(String value) {
>>     // whatever
>>   }
>> });
>> -----------------------
>>
>>
>> -----------------------
>> Config Parameters
>> -----------------------
>>
>> Configuration conf = ...
>>
>> data.map(new RichMapFunction<String, Integer>() {
>>
>>   public void open (Conficuration c) {
>>      // access the c - it will will have all elements of the conf - see
>> withParameters() below
>>      c.getString(...);
>>   }
>>
>>   public Integer map(String value) {
>>     // whatever
>>   }
>> })
>> .withParameters(conf);
>> -----------------------
>>
>>
>> Stephan
>>
>>
>> On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> trying to run some legacy code as part of Flink Job, I had to replicate
>>> configurations files across my cluster. Not a big deal with a small
>>> cluster, but it would be nice to have these configuration objects
>>> broadcast-able. Namely, it would be nice to reuse the old "read from conf
>>> file" logic to build objects that then could be serialized and used along
>>> the processing through the broadcast mechanism.
>>>
>>> Do you think it will be possible? with the new Kryo serialization it
>>> should not be extremely complicated.
>>>
>>> saluti,
>>> Stefano
>>>
>>>
>>>
>>

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Flavio Pompermaier <po...@okkam.it>.
I think Stefano was asking for a different way to pass a generic
Configuration object, not just a subclass of it.
For example, in our use case, it would be helpful to broadcast around
generic objects/pojos.
Is that possible?
On Nov 10, 2014 6:46 PM, "Stephan Ewen" <se...@apache.org> wrote:

> Hi!
>
> I hope I understand correctly what you are trying to do: To have a config
> file available in the functions, you can simply do wither of the following:
>
> -----------------------
> Closure
> -----------------------
>
> Configuration conf = ...
>
> data.map(new RichMapFunction<String, Integer>() {
>
>   public void open (Conficuration c) {
>      // access the conf object here
>      conf.getString(...);
>   }
>
>   public Integer map(String value) {
>     // whatever
>   }
> });
> -----------------------
>
>
> -----------------------
> Config Parameters
> -----------------------
>
> Configuration conf = ...
>
> data.map(new RichMapFunction<String, Integer>() {
>
>   public void open (Conficuration c) {
>      // access the c - it will will have all elements of the conf - see
> withParameters() below
>      c.getString(...);
>   }
>
>   public Integer map(String value) {
>     // whatever
>   }
> })
> .withParameters(conf);
> -----------------------
>
>
> Stephan
>
>
> On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
> wrote:
>
>> Hi,
>>
>> trying to run some legacy code as part of Flink Job, I had to replicate
>> configurations files across my cluster. Not a big deal with a small
>> cluster, but it would be nice to have these configuration objects
>> broadcast-able. Namely, it would be nice to reuse the old "read from conf
>> file" logic to build objects that then could be serialized and used along
>> the processing through the broadcast mechanism.
>>
>> Do you think it will be possible? with the new Kryo serialization it
>> should not be extremely complicated.
>>
>> saluti,
>> Stefano
>>
>>
>>
>

Re: feature request: broadcast POJO objects as part of runtime context

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I hope I understand correctly what you are trying to do: To have a config
file available in the functions, you can simply do wither of the following:

-----------------------
Closure
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the conf object here
     conf.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
});
-----------------------


-----------------------
Config Parameters
-----------------------

Configuration conf = ...

data.map(new RichMapFunction<String, Integer>() {

  public void open (Conficuration c) {
     // access the c - it will will have all elements of the conf - see
withParameters() below
     c.getString(...);
  }

  public Integer map(String value) {
    // whatever
  }
})
.withParameters(conf);
-----------------------


Stephan


On Mon, Nov 10, 2014 at 5:36 PM, Stefano Bortoli <s....@gmail.com>
wrote:

> Hi,
>
> trying to run some legacy code as part of Flink Job, I had to replicate
> configurations files across my cluster. Not a big deal with a small
> cluster, but it would be nice to have these configuration objects
> broadcast-able. Namely, it would be nice to reuse the old "read from conf
> file" logic to build objects that then could be serialized and used along
> the processing through the broadcast mechanism.
>
> Do you think it will be possible? with the new Kryo serialization it
> should not be extremely complicated.
>
> saluti,
> Stefano
>
>
>