You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Vijay Balakrishnan <bv...@gmail.com> on 2018/05/16 20:32:43 UTC

How to sleep for 1 sec and then call keyBy for partitioning

Hi,
Newbie question - What I am trying to do is the following:
CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
1. Need to partition data by cameraNbr.
*2. Then sleep for 1 sec to simulate a heavy process in the task.*
*3. Then need to partition data by TS and finally get the DataStream to
connect with another DataStream.*

DataStream<CameraWithCube> cameraWithCubeDataStream = env
                .addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
                .setParallelism(parallelTasks)
                .setMaxParallelism(parallelTasks)
                .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null
? //partition by cameraNbr
                        cameraWithCube.cameraKey.getCam() : new Object());
//sleep for 1 sec ???? how
                *((KeyedStream)
cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
*                    .apply(new WindowFunction<CameraWithCube,
CameraWithCube, String, TimeWindow>() {*
*                        @Override*
*                        public void apply(String cameraKeyCam, TimeWindow
timeWindow,*
*                                          Iterable<CameraWithCube>
cameraWithCubesAssignedToWindow,*
*                                          Collector<CameraWithCube>
collector) throws Exception {*
*                            Thread.sleep(1000);*
*
cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
collector.collect(cameraWithCube));*

*                        }*
*                    })//returning void here from apply ??*
* //partition by TS and return DataStream*
*                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
null ? //partition by cameraNbr*
*                         cameraWithCube.cameraKey.getTS() : new Object());*
;
TIA,
Vijay

Re: How to sleep for 1 sec and then call keyBy for partitioning

Posted by Vijay Balakrishnan <bv...@gmail.com>.
Hi,
This worked out after looking at
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1

Why cannot I use setParallelism after keyBy-is it not an operator ?

DataStream<CameraWithCube> cameraWithCubeDataStream = env
        .addSource(new CameraWithCubeSource(cameraFile, delay,
servingSpeedFactor))
        .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
                cameraWithCube.cameraKey.getCam() : new Object())
        //.setParallelism(parallelTasks) //??? Why cannot I use
setParallelism after keyBy-is it not an operator
        //.setMaxParallelism(parallelTasks) //
https://stackoverflow.com/questions/44436401/some-puzzles-for-the-operator-parallelism-in-flink?rq=1
        .process(new ProcessFunction<CameraWithCube, CameraWithCube>() {
            @Override
            public void processElement(CameraWithCube cameraWithCube,
Context context, Collector<CameraWithCube> collector) throws Exception
{
                logger.info("before thread sleep");
                Thread.sleep(500);
                logger.info("after thread sleep");
                collector.collect(cameraWithCube);
            }
        })
        .setParallelism(parallelTasks) //???do I need to set this or
will it take the parallelism from the earlier step ?
        .setMaxParallelism(parallelTasks)
        .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ?
                cameraWithCube.cameraKey.getTs() : new Object());


TIA,

Vijay


On Wed, May 16, 2018 at 1:41 PM Jörn Franke <jo...@gmail.com> wrote:

> Just some advice - do not use sleep to simulate a heavy task. Use real
> data or generated data to simulate. This sleep is garbage from a software
> quality point of view. Furthermore, it is often forgotten etc.
>
> On 16. May 2018, at 22:32, Vijay Balakrishnan <bv...@gmail.com> wrote:
>
> Hi,
> Newbie question - What I am trying to do is the following:
> CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
> 1. Need to partition data by cameraNbr.
> *2. Then sleep for 1 sec to simulate a heavy process in the task.*
> *3. Then need to partition data by TS and finally get the DataStream to
> connect with another DataStream.*
>
> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>                 .addSource(new CameraWithCubeSource(cameraFile, delay,
> servingSpeedFactor))
>                 .setParallelism(parallelTasks)
>                 .setMaxParallelism(parallelTasks)
>                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr
>                         cameraWithCube.cameraKey.getCam() : new Object());
> //sleep for 1 sec ???? how
>                 *((KeyedStream)
> cameraWithCubeDataStream).timeWindow(Time.seconds(1))*
> *                    .apply(new WindowFunction<CameraWithCube,
> CameraWithCube, String, TimeWindow>() {*
> *                        @Override*
> *                        public void apply(String cameraKeyCam, TimeWindow
> timeWindow,*
> *                                          Iterable<CameraWithCube>
> cameraWithCubesAssignedToWindow,*
> *                                          Collector<CameraWithCube>
> collector) throws Exception {*
> *                            Thread.sleep(1000);*
> *
> cameraWithCubesAssignedToWindow.forEach(cameraWithCube ->
> collector.collect(cameraWithCube));*
>
> *                        }*
> *                    })//returning void here from apply ??*
> * //partition by TS and return DataStream*
> *                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey !=
> null ? //partition by cameraNbr*
> *                         cameraWithCube.cameraKey.getTS() : new
> Object());*
> ;
> TIA,
> Vijay
>
>

Re: How to sleep for 1 sec and then call keyBy for partitioning

Posted by Jörn Franke <jo...@gmail.com>.
Just some advice - do not use sleep to simulate a heavy task. Use real data or generated data to simulate. This sleep is garbage from a software quality point of view. Furthermore, it is often forgotten etc.

> On 16. May 2018, at 22:32, Vijay Balakrishnan <bv...@gmail.com> wrote:
> 
> Hi,
> Newbie question - What I am trying to do is the following:
> CameraWithCubeSource source sends data-containing tuples of (cameraNbr,TS).
> 1. Need to partition data by cameraNbr.
> 2. Then sleep for 1 sec to simulate a heavy process in the task.
> 3. Then need to partition data by TS and finally get the DataStream to connect with another DataStream.
> 
> DataStream<CameraWithCube> cameraWithCubeDataStream = env
>                 .addSource(new CameraWithCubeSource(cameraFile, delay, servingSpeedFactor))
>                 .setParallelism(parallelTasks)
>                 .setMaxParallelism(parallelTasks)
>                 .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr
>                         cameraWithCube.cameraKey.getCam() : new Object());
> 				//sleep for 1 sec ???? how
>                 ((KeyedStream) cameraWithCubeDataStream).timeWindow(Time.seconds(1))
>                     .apply(new WindowFunction<CameraWithCube, CameraWithCube, String, TimeWindow>() {
>                         @Override
>                         public void apply(String cameraKeyCam, TimeWindow timeWindow,
>                                           Iterable<CameraWithCube> cameraWithCubesAssignedToWindow,
>                                           Collector<CameraWithCube> collector) throws Exception {
>                             Thread.sleep(1000);
>                             cameraWithCubesAssignedToWindow.forEach(cameraWithCube -> collector.collect(cameraWithCube));
> 
>                         }
>                     })//returning void here from apply ??
> 					//partition by TS and return DataStream
> 	                .keyBy((cameraWithCube) -> cameraWithCube.cameraKey != null ? //partition by cameraNbr
> 	                        cameraWithCube.cameraKey.getTS() : new Object());
> 					;
> 					
> TIA,
> Vijay