You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Ovidiu-Cristian MARCU <ov...@inria.fr> on 2017/01/26 21:30:00 UTC
parallelism for window operations
Hi,
I have the following program configured with parallelism 2.
After running this example I see only 2 slots are busy.
How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?
port = params.getInt("port");
// get the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(params.getInt("paral", 2));
env.setMaxParallelism(params.getInt("paral", 2));
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
DataStream<Double> counts1 = null;
counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});
DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
});
counts1.writeAsText(params.get("output1"));
counts2.writeAsText(params.get("output2"));
env.execute("Socket Window WordCount");
——
./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123
Using address 127.0.0.1:6123 to connect to JobManager.
JobManager web interface address http://127.0.0.1:8081
Starting execution of program
Printing result to stdout. Use --output to specify output path.
Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675]
01/26/2017 22:08:46 Job execution switched to status RUNNING.
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING
01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED
01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING
01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED
01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING
01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING
01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
Best,
Ovidiu
Re: parallelism for window operations
Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Now I see (documentation clear), just a correction:
because I set PInput as slot sharing group for flatMap, source and flatMap are in different slots.
Also that means S6 and S7 are the same slot, as expected because they share the same slot group output.
Best,
Ovidiu
> On 27 Jan 2017, at 10:43, Ovidiu-Cristian MARCU <ov...@inria.fr> wrote:
>
> Thank you, Fabian!
>
> It works, what I did and results, as an example for other users:
> Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).
>
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>
>
> DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
> DataStream<Double> counts1 = null;
>
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
> ...
> }).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2
>
> DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> }).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5
>
> counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
> counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7
>
> env.execute("Socket Window WordCount");
>
> <Screen Shot 2017-01-27 at 10.34.41.png>
> Best,
> Ovidiu
>
>> On 27 Jan 2017, at 10:13, Fabian Hueske <fhueske@gmail.com <ma...@gmail.com>> wrote:
>>
>> Hi Ovidiu,
>>
>> you can control the slot assignment by assigning operators to SlotSharingGroups.
>> For example like this:
>>
>> someStream.filter(...).slotSharingGroup("name");
>>
>> Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
>> Have a look at the docs as well [1]
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
>>
>> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>:
>> Hi,
>>
>> I have the following program configured with parallelism 2.
>> After running this example I see only 2 slots are busy.
>>
>> How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?
>>
>> port = params.getInt("port");
>>
>> // get the execution environment
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>>
>> env.setParallelism(params.getInt("paral", 2));
>> env.setMaxParallelism(params.getInt("paral", 2));
>>
>> // get input data by connecting to the socket
>> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>>
>> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
>> DataStream<Double> counts1 = null;
>>
>> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
>> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>> ...
>> });
>>
>> DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
>> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
>> ...
>> });
>>
>> counts1.writeAsText(params.get("output1"));
>> counts2.writeAsText(params.get("output2"));
>>
>> env.execute("Socket Window WordCount");
>> <Screen Shot 2017-01-26 at 22.21.13.png>
>>
>>
>> ——
>>
>> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
>> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 <http://127.0.0.1:6123/>
>> Using address 127.0.0.1:6123 <http://127.0.0.1:6123/> to connect to JobManager.
>> JobManager web interface address http://127.0.0.1:8081 <http://127.0.0.1:8081/>
>> Starting execution of program
>> Printing result to stdout. Use --output to specify output path.
>> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
>> Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675 <>]
>> 01/26/2017 22:08:46 Job execution switched to status RUNNING.
>> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED
>> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING
>> 01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
>> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING
>> 01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING
>> 01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
>> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
>>
>> Best,
>> Ovidiu
>>
>
Re: parallelism for window operations
Posted by Ovidiu-Cristian MARCU <ov...@inria.fr>.
Thank you, Fabian!
It works, what I did and results, as an example for other users:
Total slots occupied are 7 (not sure how to check that Source + Flat Map are in the same slot, assumed slot S1 will be that; also S6 and S7 are different, although I set the same name for slot sharing group).
// get input data by connecting to the socket
DataStream<String> text = env.socketTextStream("localhost", port, "\n");
DataStream<IN> input = text.flatMap(...).slotSharingGroup("PInput").setParallelism(1); //ONE SLOT S1
DataStream<Double> counts1 = null;
counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
.apply(new WindowFunction<IN, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("firstWindow").setParallelism(1).setMaxParallelism(1); //ONE SLOT S2
DataStream<Double> counts2 = input.keyBy(2).countWindow(windowSize, slideSize)
.apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
...
}).slotSharingGroup("secondWindow").setParallelism(3).setMaxParallelism(3); //THREE SLOTS S3, S4, S5
counts1.writeAsText(params.get("output1")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S6
counts2.writeAsText(params.get("output2")).slotSharingGroup("output").setParallelism(1); //ONE SLOT S7
env.execute("Socket Window WordCount");
Best,
Ovidiu
> On 27 Jan 2017, at 10:13, Fabian Hueske <fh...@gmail.com> wrote:
>
> Hi Ovidiu,
>
> you can control the slot assignment by assigning operators to SlotSharingGroups.
> For example like this:
>
> someStream.filter(...).slotSharingGroup("name");
>
> Operators is different groups are scheduled to different slots. By default, all operators are in the same group.
> Have a look at the docs as well [1]
>
> Best, Fabian
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups <https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups>
>
> 2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <ovidiu-cristian.marcu@inria.fr <ma...@inria.fr>>:
> Hi,
>
> I have the following program configured with parallelism 2.
> After running this example I see only 2 slots are busy.
>
> How can I ensure counts1 and counts2 are executed on their own slots with the given parallelism (in this case 2 slots each)?
>
> port = params.getInt("port");
>
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>
> env.setParallelism(params.getInt("paral", 2));
> env.setMaxParallelism(params.getInt("paral", 2));
>
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long, Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
>
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String, Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> counts1.writeAsText(params.get("output1"));
> counts2.writeAsText(params.get("output2"));
>
> env.execute("Socket Window WordCount");
> <Screen Shot 2017-01-26 at 22.21.13.png>
>
>
> ——
>
> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar --port 9000 --paral 2
> Cluster configuration: Standalone cluster with JobManager at /127.0.0.1:6123 <http://127.0.0.1:6123/>
> Using address 127.0.0.1:6123 <http://127.0.0.1:6123/> to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081 <http://127.0.0.1:8081/>
> Starting execution of program
> Printing result to stdout. Use --output to specify output path.
> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for job completion.
> Connected to JobManager at Actor[akka.tcp://flink@127.0.0.1:6123/user/jobmanager#-126254675 <>]
> 01/26/2017 22:08:46 Job execution switched to status RUNNING.
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to DEPLOYING
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{serializer=org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer@8e9ef375}, CountTrigger(2), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f, WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2) switched to RUNNING
>
> Best,
> Ovidiu
>
Re: parallelism for window operations
Posted by Fabian Hueske <fh...@gmail.com>.
Hi Ovidiu,
you can control the slot assignment by assigning operators to
SlotSharingGroups.
For example like this:
someStream.filter(...).slotSharingGroup("name");
Operators is different groups are scheduled to different slots. By default,
all operators are in the same group.
Have a look at the docs as well [1]
Best, Fabian
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#task-chaining-and-resource-groups
2017-01-26 22:30 GMT+01:00 Ovidiu-Cristian MARCU <
ovidiu-cristian.marcu@inria.fr>:
> Hi,
>
> I have the following program configured with parallelism 2.
> After running this example I see only 2 slots are busy.
>
> How can I ensure counts1 and counts2 are executed on their own slots with
> the given parallelism (in this case 2 slots each)?
>
> port = params.getInt("port");
>
> // get the execution environment
> final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> getExecutionEnvironment();
>
> env.setParallelism(params.getInt("paral", 2));
> env.setMaxParallelism(params.getInt("paral", 2));
>
> // get input data by connecting to the socket
> DataStream<String> text = env.socketTextStream("localhost", port, "\n");
>
> DataStream<Tuple8<String, String, String, Integer, String, Double, Long,
> Long>> input = text.flatMap(...);
> DataStream<Double> counts1 = null;
>
> counts1 = input.keyBy(0).countWindow(windowSize, slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String,
> Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> DataStream<Double> counts2 = input.keyBy(1).countWindow(windowSize,
> slideSize)
> .apply(new WindowFunction<Tuple8<String, String, String, Integer, String,
> Double, Long, Long>, Double, Tuple, GlobalWindow>() {
> ...
> });
>
> counts1.writeAsText(params.get("output1"));
> counts2.writeAsText(params.get("output2"));
>
> env.execute("Socket Window WordCount");
>
>
> ——
>
> ./bin/flink run flink-examples-streaming_2.10-1.2-SNAPSHOT-SocketWindowWordCount.jar
> --port 9000 --paral 2
> Cluster configuration: Standalone cluster with JobManager at /
> 127.0.0.1:6123
> Using address 127.0.0.1:6123 to connect to JobManager.
> JobManager web interface address http://127.0.0.1:8081
> Starting execution of program
> Printing result to stdout. Use --output to specify output path.
> Submitting job with JobID: bf063ec3f912871bcc7a95bc041775e5. Waiting for
> job completion.
> Connected to JobManager at Actor[akka.tcp://flink@127.0.
> 0.1:6123/user/jobmanager#-126254675]
> 01/26/2017 22:08:46 Job execution switched to status RUNNING.
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to SCHEDULED
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(1/2) switched to DEPLOYING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to SCHEDULED
> 01/26/2017 22:08:46 Flat Map(2/2) switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to SCHEDULED
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to DEPLOYING
> 01/26/2017 22:08:46 Source: Socket Stream(1/1) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(1/2) switched to RUNNING
> 01/26/2017 22:08:46 Flat Map(2/2) switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@1aa66c2e,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(1/2)
> switched to RUNNING
> 01/26/2017 22:08:46 TriggerWindow(GlobalWindows(), ListStateDescriptor{
> serializer=org.apache.flink.streaming.runtime.streamrecord.
> StreamElementSerializer@8e9ef375}, CountTrigger(2),
> org.apache.flink.streaming.api.windowing.evictors.CountEvictor@34509f7f,
> WindowedStream.apply(WindowedStream.java:261)) -> Sink: Unnamed(2/2)
> switched to RUNNING
>
> Best,
> Ovidiu
>