You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Camelia-Elena Ciolac <ca...@inria.fr> on 2014/11/04 16:16:42 UTC

RichMapFunction related question

Hello, 

I have 2 questions regarding the RichMapFunction, starting from its use in https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java . 

Q1: if we run this operator with 4 nodes, does its open(Configuration) method execute once on each node ? 

Q2: can we use fields (such as a counter) in the RichMapFunction class so that they are not shared between computation nodes, but are shared for all tuples that pass this transformation on a given computation node? 
Something like: 

dataset1.map( 
new RichMapFunction<T1,T2>(){ 
private int k; 

public void open(Configuration config){ 
........... 
k = 0; 
} 

public T2 map(T1 tupin){ 
k++; 
return new T2(....,k) 
} 

} 
); 

where T1,T2 stand for some tuple types or classes . 

Thank you in advance! 

Best regards, 
Camelia 



Re: RichMapFunction related question

Posted by Fabian Hueske <fh...@apache.org>.
There should also be a rich function stub for mapPartition.
Another option to initialize is the default constructor if the
initialization does not depend on the Configuration object.

Am Dienstag, 4. November 2014 schrieb Camelia-Elena Ciolac :

> First of all, thank you for the detailed explanation.
>
> I tried to use the MapPartitionFunction instead, as it matches better the
> case, but now I can't use any more the    open(Configuration)    method as
> I could with the RichMapFunction.
> I need a workaround to read the broadcast variable, to obtain the same
> result as:
>    getRuntimeContext().getBroadcastVariable("parameters");
>
> Is it somehow possible to access the broadcast variable from inside the
> mapPartition?
>
> Thank you!
> Camelia
>
>
> ------------------------------
>
> *De: *"Fabian Hueske" <fhueske@apache.org
> <javascript:_e(%7B%7D,'cvml','fhueske@apache.org');>>
> *À: *user@flink.incubator.apache.org
> <javascript:_e(%7B%7D,'cvml','user@flink.incubator.apache.org');>
> *Envoyé: *Mardi 4 Novembre 2014 16:50:37
> *Objet: *Re: RichMapFunction related question
>
> Hi Camelia,
>
> in general, it is assumed that the user-defined operations do not have
> side effects.
> When sharing a counter between invocations of the user-defined function
> (flatMap() in your case) this would happen.
> Since, the system does not give any guarantees which data is processed on
> which node (within the semantics of the operator of course), doing
> something as you did, will not give deterministic results.
>
> To answer your questions:
> 1) open is called exactly once for each parallel operator instance. There
> might be more than one operators instances on each node (depending on the
> number of configured slots). All instance on the same node will run within
> the same JVM, so be careful with singletons or other shared objects.
> 2) each parallel operator instance does have its own member variables,
> i.e., k will not be shared among other operator instances. However, this
> operator does not return deterministic results as pointed out.
>
> If you want to map over all elements of a partition, mapPartition might be
> a better fit than flatMap.
>
> Best, Fabian
>
> 2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <
> camelia-elena.ciolac@inria.fr
> <javascript:_e(%7B%7D,'cvml','camelia-elena.ciolac@inria.fr');>>:
>
>> Hello,
>>
>> I have 2 questions regarding the RichMapFunction, starting from its use
>> in
>> https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
>> .
>>
>> Q1: if we run this operator with 4 nodes, does its
>> open(Configuration)  method execute once on each node ?
>>
>> Q2: can we use fields (such as a counter) in the RichMapFunction class so
>> that they are not shared between computation nodes, but are shared for all
>> tuples that pass this transformation on a given computation node?
>> Something like:
>>
>> dataset1.map(
>>         new RichMapFunction<T1,T2>(){
>>                private int k;
>>
>>                public void open(Configuration config){
>>                       ...........
>>                       k = 0;
>>                }
>>
>>               public T2 map(T1 tupin){
>>                        k++;
>>                       return new T2(....,k)
>>                }
>>
>>          }
>> );
>>
>> where T1,T2 stand for some tuple types or classes .
>>
>> Thank you in advance!
>>
>> Best regards,
>> Camelia
>>
>>
>>
>
>

Re: RichMapFunction related question

Posted by Camelia-Elena Ciolac <ca...@inria.fr>.
First of all, thank you for the detailed explanation. 

I tried to use the MapPartitionFunction instead, as it matches better the case, but now I can't use any more the open(Configuration) method as I could with the RichMapFunction. 
I need a workaround to read the broadcast variable, to obtain the same result as: 
getRuntimeContext().getBroadcastVariable("parameters"); 

Is it somehow possible to access the broadcast variable from inside the mapPartition? 

Thank you! 
Camelia 

----- Mail original -----

> De: "Fabian Hueske" <fh...@apache.org>
> À: user@flink.incubator.apache.org
> Envoyé: Mardi 4 Novembre 2014 16:50:37
> Objet: Re: RichMapFunction related question

> Hi Camelia,

> in general, it is assumed that the user-defined operations do not have side
> effects.
> When sharing a counter between invocations of the user-defined function
> (flatMap() in your case) this would happen.
> Since, the system does not give any guarantees which data is processed on
> which node (within the semantics of the operator of course), doing something
> as you did, will not give deterministic results.

> To answer your questions:
> 1) open is called exactly once for each parallel operator instance. There
> might be more than one operators instances on each node (depending on the
> number of configured slots). All instance on the same node will run within
> the same JVM, so be careful with singletons or other shared objects.
> 2) each parallel operator instance does have its own member variables, i.e.,
> k will not be shared among other operator instances. However, this operator
> does not return deterministic results as pointed out.

> If you want to map over all elements of a partition, mapPartition might be a
> better fit than flatMap.

> Best, Fabian

> 2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <
> camelia-elena.ciolac@inria.fr > :

> > Hello,
> 

> > I have 2 questions regarding the RichMapFunction, starting from its use in
> > https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
> > .
> 

> > Q1: if we run this operator with 4 nodes, does its open(Configuration)
> > method
> > execute once on each node ?
> 

> > Q2: can we use fields (such as a counter) in the RichMapFunction class so
> > that they are not shared between computation nodes, but are shared for all
> > tuples that pass this transformation on a given computation node?
> 
> > Something like:
> 

> > dataset1.map(
> 
> > new RichMapFunction<T1,T2>(){
> 
> > private int k;
> 

> > public void open(Configuration config){
> 
> > ...........
> 
> > k = 0;
> 
> > }
> 

> > public T2 map(T1 tupin){
> 
> > k++;
> 
> > return new T2(....,k)
> 
> > }
> 

> > }
> 
> > );
> 

> > where T1,T2 stand for some tuple types or classes .
> 

> > Thank you in advance!
> 

> > Best regards,
> 
> > Camelia
> 

Re: RichMapFunction related question

Posted by Fabian Hueske <fh...@apache.org>.
Hi Camelia,

in general, it is assumed that the user-defined operations do not have side
effects.
When sharing a counter between invocations of the user-defined function
(flatMap() in your case) this would happen.
Since, the system does not give any guarantees which data is processed on
which node (within the semantics of the operator of course), doing
something as you did, will not give deterministic results.

To answer your questions:
1) open is called exactly once for each parallel operator instance. There
might be more than one operators instances on each node (depending on the
number of configured slots). All instance on the same node will run within
the same JVM, so be careful with singletons or other shared objects.
2) each parallel operator instance does have its own member variables,
i.e., k will not be shared among other operator instances. However, this
operator does not return deterministic results as pointed out.

If you want to map over all elements of a partition, mapPartition might be
a better fit than flatMap.

Best, Fabian

2014-11-04 16:16 GMT+01:00 Camelia-Elena Ciolac <
camelia-elena.ciolac@inria.fr>:

> Hello,
>
> I have 2 questions regarding the RichMapFunction, starting from its use in
> https://github.com/apache/incubator-flink/blob/master/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/ml/LinearRegression.java
> .
>
> Q1: if we run this operator with 4 nodes, does its    open(Configuration)
> method execute once on each node ?
>
> Q2: can we use fields (such as a counter) in the RichMapFunction class so
> that they are not shared between computation nodes, but are shared for all
> tuples that pass this transformation on a given computation node?
> Something like:
>
> dataset1.map(
>         new RichMapFunction<T1,T2>(){
>                private int k;
>
>                public void open(Configuration config){
>                       ...........
>                       k = 0;
>                }
>
>               public T2 map(T1 tupin){
>                        k++;
>                       return new T2(....,k)
>                }
>
>          }
> );
>
> where T1,T2 stand for some tuple types or classes .
>
> Thank you in advance!
>
> Best regards,
> Camelia
>
>
>