You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by HungChang <un...@gmail.com> on 2015/04/25 21:10:04 UTC

Difference between using a global variable and broadcasting a variable

Hi,

What would be the difference between using global variable and broadcasting
it?

A toy example:

// Using global
{{...
private static int num = 10;
}

public class DivByTen implements FlatMapFunction<Tuple1&lt;Double>,
Tuple1<Double>> {
  @Override
  public void flatMap(Tuple1<Double>value, Collector<Tuple1&lt;Double>> out)
{
     out.collect(new Tuple1<Double>(value/ num));
  }
}}

// Using broadcasting :
{...
public static class DivByTen extends
			RichGMapFunction<Tuple1&lt;Double>, Tuple1<Double>>{

		private long num;

		@Override
		public void open(Configuration parameters) throws Exception {
			super.open(parameters);
			num = getRuntimeContext().<Integer> getBroadcastVariable(
					"num").get(0);
		}

		@Override
		public void map(Tuple1<Double>value, Collector<Tuple1&lt;Double>> out))
throws Exception{			
			out.collect(new Tuple1<Double>(value/num));
		}
	}
}

Best regards,

Hung



--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Difference-between-using-a-global-variable-and-broadcasting-a-variable-tp1128.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Re: Difference between using a global variable and broadcasting a variable

Posted by Stephan Ewen <se...@apache.org>.
Hi!

I put a quick summary into the wiki. For future reference.

https://cwiki.apache.org/confluence/display/FLINK/Variables+Closures+vs.+Broadcast+Variables

Greetings,
Stephan


On Mon, Apr 27, 2015 at 11:10 AM, Stephan Ewen <se...@apache.org> wrote:

> Adding to Fabian's and Sebastian's answer:
>
>
> Variable in Closure (global variable)
> ------------------------------------------------------
>  - Happens when you reference some variable in the program from a
> function. The variable becomes part of the Function's closure.
>  - The variable is distributed with the CODE. It is part of the function
> object and is distributed with by the TaskDeployment messages.
>  - Data needs to be available in the driver program (cannot be a Flink
> DataSet, which lives distributedly)
>  - Should be used for constants or config parameters or simple scalar
> values.
>
> Summary: Small data that is available on the client (driver program)
>
>
>
> Broadcast set
> ------------------------------------------------------
>  - Refers to data that is produced by a Flink operation (DataSet) and that
> lives in the cluster, rather than on the client (or in the driver program)
>  - Data distribution is part of the distributed data flow and happens
> through the Flink network stack
>  - Can be much larger than the closure variables.
>  - Should be used when you want to make an intermediate result of a Flink
> computation accessible to all functions.
>
>
> Greetings,
> Stephan
>
>
>
> On Mon, Apr 27, 2015 at 10:23 AM, Fabian Hueske <fh...@gmail.com> wrote:
>
>> You should also be aware that the value of a static variable is only
>> accessible within the same JVM.
>> Flink is a distributed system and runs in multiple JVMs. So if you set a
>> value in one JVM it is not visible in another JVM (on a different node).
>>
>> In general, I would avoid to use static variables in Flink programs.
>>
>> Best, Fabian
>>
>> 2015-04-26 9:54 GMT+02:00 Sebastian <ss...@apache.org>:
>>
>>> Hi Hung,
>>>
>>> A broadcast variable can also refer to an intermediate result of a Flink
>>> computation.
>>>
>>> Best,
>>> Sebastian
>>>
>>>
>>> On 25.04.2015 21:10, HungChang wrote:
>>>
>>>> Hi,
>>>>
>>>> What would be the difference between using global variable and
>>>> broadcasting
>>>> it?
>>>>
>>>> A toy example:
>>>>
>>>> // Using global
>>>> {{...
>>>> private static int num = 10;
>>>> }
>>>>
>>>> public class DivByTen implements FlatMapFunction<Tuple1&lt;Double>,
>>>> Tuple1<Double>> {
>>>>    @Override
>>>>    public void flatMap(Tuple1<Double>value,
>>>> Collector<Tuple1&lt;Double>> out)
>>>> {
>>>>       out.collect(new Tuple1<Double>(value/ num));
>>>>    }
>>>> }}
>>>>
>>>> // Using broadcasting :
>>>> {...
>>>> public static class DivByTen extends
>>>>                         RichGMapFunction<Tuple1&lt;Double>,
>>>> Tuple1<Double>>{
>>>>
>>>>                 private long num;
>>>>
>>>>                 @Override
>>>>                 public void open(Configuration parameters) throws
>>>> Exception {
>>>>                         super.open(parameters);
>>>>                         num = getRuntimeContext().<Integer>
>>>> getBroadcastVariable(
>>>>                                         "num").get(0);
>>>>                 }
>>>>
>>>>                 @Override
>>>>                 public void map(Tuple1<Double>value,
>>>> Collector<Tuple1&lt;Double>> out))
>>>> throws Exception{
>>>>                         out.collect(new Tuple1<Double>(value/num));
>>>>                 }
>>>>         }
>>>> }
>>>>
>>>> Best regards,
>>>>
>>>> Hung
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Difference-between-using-a-global-variable-and-broadcasting-a-variable-tp1128.html
>>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>>> archive at Nabble.com.
>>>>
>>>>
>>
>

Re: Difference between using a global variable and broadcasting a variable

Posted by Stephan Ewen <se...@apache.org>.
Adding to Fabian's and Sebastian's answer:


Variable in Closure (global variable)
------------------------------------------------------
 - Happens when you reference some variable in the program from a function.
The variable becomes part of the Function's closure.
 - The variable is distributed with the CODE. It is part of the function
object and is distributed with by the TaskDeployment messages.
 - Data needs to be available in the driver program (cannot be a Flink
DataSet, which lives distributedly)
 - Should be used for constants or config parameters or simple scalar
values.

Summary: Small data that is available on the client (driver program)



Broadcast set
------------------------------------------------------
 - Refers to data that is produced by a Flink operation (DataSet) and that
lives in the cluster, rather than on the client (or in the driver program)
 - Data distribution is part of the distributed data flow and happens
through the Flink network stack
 - Can be much larger than the closure variables.
 - Should be used when you want to make an intermediate result of a Flink
computation accessible to all functions.


Greetings,
Stephan



On Mon, Apr 27, 2015 at 10:23 AM, Fabian Hueske <fh...@gmail.com> wrote:

> You should also be aware that the value of a static variable is only
> accessible within the same JVM.
> Flink is a distributed system and runs in multiple JVMs. So if you set a
> value in one JVM it is not visible in another JVM (on a different node).
>
> In general, I would avoid to use static variables in Flink programs.
>
> Best, Fabian
>
> 2015-04-26 9:54 GMT+02:00 Sebastian <ss...@apache.org>:
>
>> Hi Hung,
>>
>> A broadcast variable can also refer to an intermediate result of a Flink
>> computation.
>>
>> Best,
>> Sebastian
>>
>>
>> On 25.04.2015 21:10, HungChang wrote:
>>
>>> Hi,
>>>
>>> What would be the difference between using global variable and
>>> broadcasting
>>> it?
>>>
>>> A toy example:
>>>
>>> // Using global
>>> {{...
>>> private static int num = 10;
>>> }
>>>
>>> public class DivByTen implements FlatMapFunction<Tuple1&lt;Double>,
>>> Tuple1<Double>> {
>>>    @Override
>>>    public void flatMap(Tuple1<Double>value, Collector<Tuple1&lt;Double>>
>>> out)
>>> {
>>>       out.collect(new Tuple1<Double>(value/ num));
>>>    }
>>> }}
>>>
>>> // Using broadcasting :
>>> {...
>>> public static class DivByTen extends
>>>                         RichGMapFunction<Tuple1&lt;Double>,
>>> Tuple1<Double>>{
>>>
>>>                 private long num;
>>>
>>>                 @Override
>>>                 public void open(Configuration parameters) throws
>>> Exception {
>>>                         super.open(parameters);
>>>                         num = getRuntimeContext().<Integer>
>>> getBroadcastVariable(
>>>                                         "num").get(0);
>>>                 }
>>>
>>>                 @Override
>>>                 public void map(Tuple1<Double>value,
>>> Collector<Tuple1&lt;Double>> out))
>>> throws Exception{
>>>                         out.collect(new Tuple1<Double>(value/num));
>>>                 }
>>>         }
>>> }
>>>
>>> Best regards,
>>>
>>> Hung
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Difference-between-using-a-global-variable-and-broadcasting-a-variable-tp1128.html
>>> Sent from the Apache Flink User Mailing List archive. mailing list
>>> archive at Nabble.com.
>>>
>>>
>

Re: Difference between using a global variable and broadcasting a variable

Posted by Fabian Hueske <fh...@gmail.com>.
You should also be aware that the value of a static variable is only
accessible within the same JVM.
Flink is a distributed system and runs in multiple JVMs. So if you set a
value in one JVM it is not visible in another JVM (on a different node).

In general, I would avoid to use static variables in Flink programs.

Best, Fabian

2015-04-26 9:54 GMT+02:00 Sebastian <ss...@apache.org>:

> Hi Hung,
>
> A broadcast variable can also refer to an intermediate result of a Flink
> computation.
>
> Best,
> Sebastian
>
>
> On 25.04.2015 21:10, HungChang wrote:
>
>> Hi,
>>
>> What would be the difference between using global variable and
>> broadcasting
>> it?
>>
>> A toy example:
>>
>> // Using global
>> {{...
>> private static int num = 10;
>> }
>>
>> public class DivByTen implements FlatMapFunction<Tuple1&lt;Double>,
>> Tuple1<Double>> {
>>    @Override
>>    public void flatMap(Tuple1<Double>value, Collector<Tuple1&lt;Double>>
>> out)
>> {
>>       out.collect(new Tuple1<Double>(value/ num));
>>    }
>> }}
>>
>> // Using broadcasting :
>> {...
>> public static class DivByTen extends
>>                         RichGMapFunction<Tuple1&lt;Double>,
>> Tuple1<Double>>{
>>
>>                 private long num;
>>
>>                 @Override
>>                 public void open(Configuration parameters) throws
>> Exception {
>>                         super.open(parameters);
>>                         num = getRuntimeContext().<Integer>
>> getBroadcastVariable(
>>                                         "num").get(0);
>>                 }
>>
>>                 @Override
>>                 public void map(Tuple1<Double>value,
>> Collector<Tuple1&lt;Double>> out))
>> throws Exception{
>>                         out.collect(new Tuple1<Double>(value/num));
>>                 }
>>         }
>> }
>>
>> Best regards,
>>
>> Hung
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Difference-between-using-a-global-variable-and-broadcasting-a-variable-tp1128.html
>> Sent from the Apache Flink User Mailing List archive. mailing list
>> archive at Nabble.com.
>>
>>

Re: Difference between using a global variable and broadcasting a variable

Posted by Sebastian <ss...@apache.org>.
Hi Hung,

A broadcast variable can also refer to an intermediate result of a Flink 
computation.

Best,
Sebastian

On 25.04.2015 21:10, HungChang wrote:
> Hi,
>
> What would be the difference between using global variable and broadcasting
> it?
>
> A toy example:
>
> // Using global
> {{...
> private static int num = 10;
> }
>
> public class DivByTen implements FlatMapFunction<Tuple1&lt;Double>,
> Tuple1<Double>> {
>    @Override
>    public void flatMap(Tuple1<Double>value, Collector<Tuple1&lt;Double>> out)
> {
>       out.collect(new Tuple1<Double>(value/ num));
>    }
> }}
>
> // Using broadcasting :
> {...
> public static class DivByTen extends
> 			RichGMapFunction<Tuple1&lt;Double>, Tuple1<Double>>{
>
> 		private long num;
>
> 		@Override
> 		public void open(Configuration parameters) throws Exception {
> 			super.open(parameters);
> 			num = getRuntimeContext().<Integer> getBroadcastVariable(
> 					"num").get(0);
> 		}
>
> 		@Override
> 		public void map(Tuple1<Double>value, Collector<Tuple1&lt;Double>> out))
> throws Exception{			
> 			out.collect(new Tuple1<Double>(value/num));
> 		}
> 	}
> }
>
> Best regards,
>
> Hung
>
>
>
> --
> View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Difference-between-using-a-global-variable-and-broadcasting-a-variable-tp1128.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.
>