You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by yutao sun <yu...@gmail.com> on 2016/02/02 19:31:26 UTC

Tumbling Windows with Processing Time

Hi Flink users,

I have a question about Tumbling Windows using Processing Time at Flink ver
0.10.1 :

In fact, I want to measure the throughput of my application, the idea is at
the last operator, by using a Tumbling processing Time windows with a size
of 1 second, I count the message received.

The problem is in case of 4 parallelisms, the number of windows should be
4/second, but I got 7 windows/second,  I wonder if is there any error the
windows is defined?

I copy my code here and thanks a lot for your help in advance.
[KAFKA partition : 4]


*val env = StreamExecutionEnvironment.getExecutionEnvironment*


*val parallelism = 4*

































*env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(
 new FlinkKafkaConsumer082[String](    "test_topic",    new
SimpleStringSchema,    properties for connection KAFKA  )) .rebalance
.map(do some thing) .map(payload => (payload, 1L))
.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
.timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload,
Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L))
.addSink(   new FlinkKafkaProducer[(Payload, Long)](
KafkaBootstrapServers,    TARGET_TOPIC,    new
SerializationSchema[(Payload, Long), Array[Byte]] {      override def
serialize(element: (Payload, Long)): Array[Byte] = {
element._2.toString().getBytes      }    }  ))env.execute("test")*

Re: Tumbling Windows with Processing Time

Posted by yutao sun <yu...@gmail.com>.
Exactly, I have more than 4 keys because the "nenative modulo",  after
thange this line from

*.keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)*

to



*.keyBy(mappedPayload => Math.abs(mappedPayload._1.id.hashcode % parallelism))*





*or just profit Flink's dataStream.partitionByHash(Field)*


*Thanks for your help!  Cheers :)*



2016-02-03 14:35 GMT+01:00 Aljoscha Krettek <al...@apache.org>:

> How long did you run the job? Could it be an artifact of the timing and it
> hasn’t yet averaged out.
> > On 03 Feb 2016, at 14:32, Aljoscha Krettek <al...@apache.org> wrote:
> >
> > There should be 4 windows because there are only 4 distinct keys, if I
> understand this line correctly:
> >
> > .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
> >
> >> On 02 Feb 2016, at 19:31, yutao sun <yu...@gmail.com> wrote:
> >>
> >> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
> >
>
>

Re: Tumbling Windows with Processing Time

Posted by Aljoscha Krettek <al...@apache.org>.
How long did you run the job? Could it be an artifact of the timing and it hasn’t yet averaged out.
> On 03 Feb 2016, at 14:32, Aljoscha Krettek <al...@apache.org> wrote:
> 
> There should be 4 windows because there are only 4 distinct keys, if I understand this line correctly:
> 
> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
> 
>> On 02 Feb 2016, at 19:31, yutao sun <yu...@gmail.com> wrote:
>> 
>> .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)
> 


Re: Tumbling Windows with Processing Time

Posted by Aljoscha Krettek <al...@apache.org>.
There should be 4 windows because there are only 4 distinct keys, if I understand this line correctly:

 .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)

> On 02 Feb 2016, at 19:31, yutao sun <yu...@gmail.com> wrote:
> 
>  .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism)


Re: Tumbling Windows with Processing Time

Posted by Stephan Ewen <se...@apache.org>.
Do you have 7 distinct keys? You get as many result tuples as you have
keys, because the window is per key.

On Wed, Feb 3, 2016 at 12:12 PM, yutao sun <yu...@gmail.com> wrote:

> Thanks for your help,  I retest by disable the object reuse and got the
> same result (please see the picture attached).
>
>
> ​
> ​
>
> 2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>:
>
>> The definition looks correct.
>> Because the windows are by-key, you should get one window result per key
>> per second.
>>
>> Can you turn off object-reuse? That is a pretty experimental thing and
>> works with the batch operations quite well, but not so much with the
>> streaming windows, yet.
>> I would only enable object reuse after the program works well and
>> correctly without.
>>
>> Greetings,
>> Stephan
>>
>>
>> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yu...@gmail.com> wrote:
>>
>>> Hi Flink users,
>>>
>>> I have a question about Tumbling Windows using Processing Time at Flink
>>> ver 0.10.1 :
>>>
>>> In fact, I want to measure the throughput of my application, the idea is
>>> at the last operator, by using a Tumbling processing Time windows with a
>>> size of 1 second, I count the message received.
>>>
>>> The problem is in case of 4 parallelisms, the number of windows should
>>> be 4/second, but I got 7 windows/second,  I wonder if is there any error
>>> the windows is defined?
>>>
>>> I copy my code here and thanks a lot for your help in advance.
>>> [KAFKA partition : 4]
>>>
>>>
>>> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>>>
>>>
>>> *val parallelism = 4*
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(  new FlinkKafkaConsumer082[String](    "test_topic",    new SimpleStringSchema,    properties for connection KAFKA  )) .rebalance .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L)) .addSink(   new FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,    TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {      override def serialize(element: (Payload, Long)): Array[Byte] = {        element._2.toString().getBytes      }    }  ))env.execute("test")*
>>>
>>>
>>>
>>>
>>
>

Re: Tumbling Windows with Processing Time

Posted by yutao sun <yu...@gmail.com>.
Thanks for your help,  I retest by disable the object reuse and got the
same result (please see the picture attached).


​
​

2016-02-03 10:51 GMT+01:00 Stephan Ewen <se...@apache.org>:

> The definition looks correct.
> Because the windows are by-key, you should get one window result per key
> per second.
>
> Can you turn off object-reuse? That is a pretty experimental thing and
> works with the batch operations quite well, but not so much with the
> streaming windows, yet.
> I would only enable object reuse after the program works well and
> correctly without.
>
> Greetings,
> Stephan
>
>
> On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yu...@gmail.com> wrote:
>
>> Hi Flink users,
>>
>> I have a question about Tumbling Windows using Processing Time at Flink
>> ver 0.10.1 :
>>
>> In fact, I want to measure the throughput of my application, the idea is
>> at the last operator, by using a Tumbling processing Time windows with a
>> size of 1 second, I count the message received.
>>
>> The problem is in case of 4 parallelisms, the number of windows should be
>> 4/second, but I got 7 windows/second,  I wonder if is there any error the
>> windows is defined?
>>
>> I copy my code here and thanks a lot for your help in advance.
>> [KAFKA partition : 4]
>>
>>
>> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>>
>>
>> *val parallelism = 4*
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(  new FlinkKafkaConsumer082[String](    "test_topic",    new SimpleStringSchema,    properties for connection KAFKA  )) .rebalance .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L)) .addSink(   new FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,    TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {      override def serialize(element: (Payload, Long)): Array[Byte] = {        element._2.toString().getBytes      }    }  ))env.execute("test")*
>>
>>
>>
>>
>

Re: Tumbling Windows with Processing Time

Posted by Stephan Ewen <se...@apache.org>.
The definition looks correct.
Because the windows are by-key, you should get one window result per key
per second.

Can you turn off object-reuse? That is a pretty experimental thing and
works with the batch operations quite well, but not so much with the
streaming windows, yet.
I would only enable object reuse after the program works well and correctly
without.

Greetings,
Stephan


On Tue, Feb 2, 2016 at 7:31 PM, yutao sun <yu...@gmail.com> wrote:

> Hi Flink users,
>
> I have a question about Tumbling Windows using Processing Time at Flink
> ver 0.10.1 :
>
> In fact, I want to measure the throughput of my application, the idea is
> at the last operator, by using a Tumbling processing Time windows with a
> size of 1 second, I count the message received.
>
> The problem is in case of 4 parallelisms, the number of windows should be
> 4/second, but I got 7 windows/second,  I wonder if is there any error the
> windows is defined?
>
> I copy my code here and thanks a lot for your help in advance.
> [KAFKA partition : 4]
>
>
> *val env = StreamExecutionEnvironment.getExecutionEnvironment*
>
>
> *val parallelism = 4*
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *env.setParallelism(parallelism)env.getConfig.enableObjectReuse()env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)env.getConfig.setAutoWatermarkInterval(-1L)env.getConfig.disableTimestamps()env.addSource(  new FlinkKafkaConsumer082[String](    "test_topic",    new SimpleStringSchema,    properties for connection KAFKA  )) .rebalance .map(do some thing) .map(payload => (payload, 1L)) .keyBy(mappedPayload => mappedPayload._1.id.hashcode % parallelism) .timeWindow(Time.of(1, TimeUnit.SECONDS)) .reduce((tuple0: (Payload, Long), tuple1: (Payload, Long)) => (tuple._0, tuple._1 + 1L)) .addSink(   new FlinkKafkaProducer[(Payload, Long)](    KafkaBootstrapServers,    TARGET_TOPIC,    new SerializationSchema[(Payload, Long), Array[Byte]] {      override def serialize(element: (Payload, Long)): Array[Byte] = {        element._2.toString().getBytes      }    }  ))env.execute("test")*
>
>
>
>