You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ovidiu-Cristian MARCU <ov...@inria.fr> on 2017/01/26 21:30:00 UTC

parallelism for window operations

Hi,

I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.

How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?

		port = params.getInt("port");

		// get the execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		env.setParallelism(params.getInt("paral", 2));
		env.setMaxParallelism(params.getInt("paral", 2));

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream("localhost", port, "\n");

		DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
		DataStream<Double> counts1 = null;

		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
				});

		DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
				});

		counts1.writeAsText(params.get("output1"));
		counts2.writeAsText(params.get("output2"));

		env.execute("Socket Window WordCount");



——

./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675]
01/26/2017 22:08:46	Job execution switched to status RUNNING.
01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to SCHEDULED 
01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to DEPLOYING 
01/26/2017 22:08:46	Flat Map(1/2) switched to SCHEDULED 
01/26/2017 22:08:46	Flat Map(1/2) switched to DEPLOYING 
01/26/2017 22:08:46	Flat Map(2/2) switched to SCHEDULED 
01/26/2017 22:08:46	Flat Map(2/2) switched to DEPLOYING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to RUNNING 
01/26/2017 22:08:46	Flat Map(1/2) switched to RUNNING 
01/26/2017 22:08:46	Flat Map(2/2) switched to RUNNING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 

Best,
Ovidiu

Re: parallelism for window operations

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Now I see (documentation clear), just a correction:

because I set PInput as slot sharing group for flatMap, source and flatMap are in different slots.
Also that means S6 and S7 are the same slot, as expected because they share the same slot group output.

Best,
Ovidiu 
> On 27 Jan 2017, at 10:43, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
> 
> Thank you, Fabian!
> 
> It works, what I did and results, as an example for other users:
> Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).
> 
> 		// get input data by connecting to the socket
> 		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> 
> 		
> 		DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
> 		DataStream<Double> counts1 = null;
> 
> 		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
> ...
> 				}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2
> 
> 		DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> 				}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5
> 
> 		counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
> 		counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7
> 
> 		env.execute("Socket Window WordCount");
> 
> <Screen Shot 2017-01-27 at 10.34.41.png>
> Best,
> Ovidiu
> 
>> On 27 Jan 2017, at 10:13, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>> 
>> Hi Ovidiu,
>> 
>> you can control the slot assignment by assigning operators to SlotSharingGroups.
>> For example like this:
>> 
>> someStream.filter(...).slotSharingGroup("name");
>> 
>> Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
>> Have a look at the docs as well [1]
>> 
>> Best, Fabian
>> 
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
>> 
>> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>:
>> Hi,
>> 
>> I have the following program configured with parallelism 2.
>> After running this example I see only 2 slots are busy.
>> 
>> How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?
>> 
>> 		port = params.getInt("port");
>> 
>> 		// get the execution environment
>> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 		env.setParallelism(params.getInt("paral", 2));
>> 		env.setMaxParallelism(params.getInt("paral", 2));
>> 
>> 		// get input data by connecting to the socket
>> 		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>> 
>> 		DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
>> 		DataStream<Double> counts1 = null;
>> 
>> 		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>> ...
>> 				});
>> 
>> 		DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
>> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>> ...
>> 				});
>> 
>> 		counts1.writeAsText(params.get("output1"));
>> 		counts2.writeAsText(params.get("output2"));
>> 
>> 		env.execute("Socket Window WordCount");
>> <Screen Shot 2017-01-26 at 22.21.13.png>
>> 
>> 
>> ——
>> 
>> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
>> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 <http://127.0.0.1:6123/>
>> Using address 127.0.0.1:6123 <http://127.0.0.1:6123/> to connect to JobManager.
>> JobManager web interface address http://127.0.0.1:8081 <http://127.0.0.1:8081/>
>> Starting execution of program
>> Printing result to stdout. Use --output to specify output path.
>> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675 <>]
>> 01/26/2017 22:08:46	Job execution switched to status RUNNING.
>> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to SCHEDULED 
>> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to DEPLOYING 
>> 01/26/2017 22:08:46	Flat Map(1/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	Flat Map(1/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	Flat Map(2/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	Flat Map(2/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
>> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to RUNNING 
>> 01/26/2017 22:08:46	Flat Map(1/2) switched to RUNNING 
>> 01/26/2017 22:08:46	Flat Map(2/2) switched to RUNNING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
>> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
>> 
>> Best,
>> Ovidiu
>> 
> 


Re: parallelism for window operations

Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Thank you, Fabian!

It works, what I did and results, as an example for other users:
Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).

		// get input data by connecting to the socket
		DataStream<String> text = env.socketTextStream("localhost", port, "\n");

		
		DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
		DataStream<Double> counts1 = null;

		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
...
				}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2

		DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
				}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5

		counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
		counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7

		env.execute("Socket Window WordCount");


Best,
Ovidiu

> On 27 Jan 2017, at 10:13, Fabian Hueske <fh...@gmail.com> wrote:
> 
> Hi Ovidiu,
> 
> you can control the slot assignment by assigning operators to SlotSharingGroups.
> For example like this:
> 
> someStream.filter(...).slotSharingGroup("name");
> 
> Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
> Have a look at the docs as well [1]
> 
> Best, Fabian
> 
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
> 
> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>:
> Hi,
> 
> I have the following program configured with parallelism 2.
> After running this example I see only 2 slots are busy.
> 
> How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?
> 
> 		port = params.getInt("port");
> 
> 		// get the execution environment
> 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 		env.setParallelism(params.getInt("paral", 2));
> 		env.setMaxParallelism(params.getInt("paral", 2));
> 
> 		// get input data by connecting to the socket
> 		DataStream<String> text = env.socketTextStream("localhost", port, "\n");
> 
> 		DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
> 		DataStream<Double> counts1 = null;
> 
> 		counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> 				});
> 
> 		DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
> 				.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> 				});
> 
> 		counts1.writeAsText(params.get("output1"));
> 		counts2.writeAsText(params.get("output2"));
> 
> 		env.execute("Socket Window WordCount");
> <Screen Shot 2017-01-26 at 22.21.13.png>
> 
> 
> ——
> 
> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 <http://127.0.0.1:6123/>
> Using address 127.0.0.1:6123 <http://127.0.0.1:6123/> to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081 <http://127.0.0.1:8081/>
> Starting execution of program
> Printing result to stdout. Use --output to specify output path.
> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675 <>]
> 01/26/2017 22:08:46	Job execution switched to status RUNNING.
> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to SCHEDULED 
> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to DEPLOYING 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING 
> 01/26/2017 22:08:46	Source: Socket Stream(1/1) switched to RUNNING 
> 01/26/2017 22:08:46	Flat Map(1/2) switched to RUNNING 
> 01/26/2017 22:08:46	Flat Map(2/2) switched to RUNNING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING 
> 01/26/2017 22:08:46	TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING 
> 
> Best,
> Ovidiu
> 


Re: parallelism for window operations

Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ovidiu,

you can control the slot assignment by assigning operators to
SlotSharingGroups.
For example like this:

someStream.filter(...).slotSharingGroup("name");

Operators is different groups are scheduled to different slots. By default,
all operators are in the same group.
Have a look at the docs as well [1]

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups

2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr>:

> Hi,
>
> I have the following program configured with parallelism 2.
> After running this example I see only 2 slots are busy.
>
> How can I ensure counts1 and counts2 are executed on their own slots with
> the given parallelism (in this case 2 slots each)?
>
> port = params.getInt("port");
>
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.setParallelism(params.getInt("paral", 2));
> env.setMaxParallelism(params.getInt("paral", 2));
>
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
> Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
>
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String,
> Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize,
> slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String,
> Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> counts1.writeAsText(params.get("output1"));
> counts2.writeAsText(params.get("output2"));
>
> env.execute("Socket Window WordCount");
>
>
> ——
>
> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar
> --port 9000 --paral 2
> Cluster configuration: Standalone cluster with JobManager at /
> 127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> Printing result to stdout. Use --output to specify output path.
> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@127.0.
> 0.1:6123/user/jobmanager#-126254675]
> 01/26/2017 22:08:46 Job execution switched to status RUNNING.
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to RUNNING
>
> Best,
> Ovidiu
>