You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Robert Schmidtke <ro...@gmail.com> on 2016/05/02 19:02:01 UTC

Measuring latency in a DataStream

Hi everyone,

I have implemented a way to measure latency in a DataStream (I hope): I'm
consuming a Kafka topic and I'm union'ing the resulting stream with a
custom source that emits a (machine-local) timestamp every 1000ms (using
currentTimeMillis). On the consuming end I'm distinguishing between the
Kafka events and the timestamps. When encountering a timestamp, I take the
difference of the processing machine's local time and the timestamp found
in the stream, expecting a positive difference (with the processing
machine's timestamp being larger than the timestamp found in the stream).
However, the opposite is the case. Now I am wondering about when events are
actually processed.

Union the Stream from Kafka+my custom source, batching them in 10s windows
(which is what I do), I expect 10 timestamps with ascending values and a
rough gap of 1000ms in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68

On the receiving end I again take the currentTimeMillis in my fold
function, expecting the resulting value to be larger (most of the time)
than the timestamps encountered in the stream:
https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53

The system clocks are in sync up to 1ms.

Maybe I am not clear about when certain timestamps are created (i.e. when
the UDFs are invoked) or how windows are processed. Any advice is greatly
appreciated, also alternative approaches to calculating latency.

I'm on Flink 0.10.2 by the way.

Thanks in advance for the help!

Robert

-- 
My GPG Key ID: 336E2680

Re: Measuring latency in a DataStream

Posted by Robert Schmidtke <ro...@gmail.com>.
After fixing the clock issue on the application level, the latency is as
expected. Thanks again!

Robert

On Tue, May 3, 2016 at 9:54 AM, Robert Schmidtke <ro...@gmail.com>
wrote:

> Hi Igor, thanks for your reply.
>
> As for your first point I'm not sure I understand correctly. I'm ingesting
> records at a rate of about 50k records per second, and those records are
> fairly small. If I add a time stamp to each of them, I will have a lot more
> data, which is not exactly what I want. Instead I wanted to add something
> like a watermark once every second and only have a time stamp on this one,
> and calculate the latency from it.
>
> For your second point, in fact the clocks are up to 8s apart -.-" not sure
> how I missed this yesterday. as I'm not an admin of the machine I will
> request ntp to be set up.
>
> Thanks!
> Robert
>
>
>
> On Mon, May 2, 2016 at 10:19 PM, Igor Berman <ig...@gmail.com>
> wrote:
>
>> 1. why are you doing join instead of something like
>> System.currentTimeInMillis()? at the end you have tuple of your data with
>> timestamp anyways...so why just not to wrap you data in tuple2 with
>> additional info of creation ts?
>>
>> 2. are you sure that consumer/producer machines' clocks are in sync?
>> you can use ntp for this.
>>
>> On 2 May 2016 at 20:02, Robert Schmidtke <ro...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I have implemented a way to measure latency in a DataStream (I hope):
>>> I'm consuming a Kafka topic and I'm union'ing the resulting stream with a
>>> custom source that emits a (machine-local) timestamp every 1000ms (using
>>> currentTimeMillis). On the consuming end I'm distinguishing between the
>>> Kafka events and the timestamps. When encountering a timestamp, I take the
>>> difference of the processing machine's local time and the timestamp found
>>> in the stream, expecting a positive difference (with the processing
>>> machine's timestamp being larger than the timestamp found in the stream).
>>> However, the opposite is the case. Now I am wondering about when events are
>>> actually processed.
>>>
>>> Union the Stream from Kafka+my custom source, batching them in 10s
>>> windows (which is what I do), I expect 10 timestamps with ascending values
>>> and a rough gap of 1000ms in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>>>
>>> On the receiving end I again take the currentTimeMillis in my fold
>>> function, expecting the resulting value to be larger (most of the time)
>>> than the timestamps encountered in the stream:
>>>
>>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>>>
>>> The system clocks are in sync up to 1ms.
>>>
>>> Maybe I am not clear about when certain timestamps are created (i.e.
>>> when the UDFs are invoked) or how windows are processed. Any advice is
>>> greatly appreciated, also alternative approaches to calculating latency.
>>>
>>> I'm on Flink 0.10.2 by the way.
>>>
>>> Thanks in advance for the help!
>>>
>>> Robert
>>>
>>> --
>>> My GPG Key ID: 336E2680
>>>
>>
>>
>
>
> --
> My GPG Key ID: 336E2680
>



-- 
My GPG Key ID: 336E2680

Re: Measuring latency in a DataStream

Posted by Robert Schmidtke <ro...@gmail.com>.
Hi Igor, thanks for your reply.

As for your first point I'm not sure I understand correctly. I'm ingesting
records at a rate of about 50k records per second, and those records are
fairly small. If I add a time stamp to each of them, I will have a lot more
data, which is not exactly what I want. Instead I wanted to add something
like a watermark once every second and only have a time stamp on this one,
and calculate the latency from it.

For your second point, in fact the clocks are up to 8s apart -.-" not sure
how I missed this yesterday. as I'm not an admin of the machine I will
request ntp to be set up.

Thanks!
Robert



On Mon, May 2, 2016 at 10:19 PM, Igor Berman <ig...@gmail.com> wrote:

> 1. why are you doing join instead of something like
> System.currentTimeInMillis()? at the end you have tuple of your data with
> timestamp anyways...so why just not to wrap you data in tuple2 with
> additional info of creation ts?
>
> 2. are you sure that consumer/producer machines' clocks are in sync?
> you can use ntp for this.
>
> On 2 May 2016 at 20:02, Robert Schmidtke <ro...@gmail.com> wrote:
>
>> Hi everyone,
>>
>> I have implemented a way to measure latency in a DataStream (I hope): I'm
>> consuming a Kafka topic and I'm union'ing the resulting stream with a
>> custom source that emits a (machine-local) timestamp every 1000ms (using
>> currentTimeMillis). On the consuming end I'm distinguishing between the
>> Kafka events and the timestamps. When encountering a timestamp, I take the
>> difference of the processing machine's local time and the timestamp found
>> in the stream, expecting a positive difference (with the processing
>> machine's timestamp being larger than the timestamp found in the stream).
>> However, the opposite is the case. Now I am wondering about when events are
>> actually processed.
>>
>> Union the Stream from Kafka+my custom source, batching them in 10s
>> windows (which is what I do), I expect 10 timestamps with ascending values
>> and a rough gap of 1000ms in the stream:
>>
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>>
>> On the receiving end I again take the currentTimeMillis in my fold
>> function, expecting the resulting value to be larger (most of the time)
>> than the timestamps encountered in the stream:
>>
>> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>>
>> The system clocks are in sync up to 1ms.
>>
>> Maybe I am not clear about when certain timestamps are created (i.e. when
>> the UDFs are invoked) or how windows are processed. Any advice is greatly
>> appreciated, also alternative approaches to calculating latency.
>>
>> I'm on Flink 0.10.2 by the way.
>>
>> Thanks in advance for the help!
>>
>> Robert
>>
>> --
>> My GPG Key ID: 336E2680
>>
>
>


-- 
My GPG Key ID: 336E2680

Re: Measuring latency in a DataStream

Posted by Igor Berman <ig...@gmail.com>.
1. why are you doing join instead of something like
System.currentTimeInMillis()? at the end you have tuple of your data with
timestamp anyways...so why just not to wrap you data in tuple2 with
additional info of creation ts?

2. are you sure that consumer/producer machines' clocks are in sync?
you can use ntp for this.

On 2 May 2016 at 20:02, Robert Schmidtke <ro...@gmail.com> wrote:

> Hi everyone,
>
> I have implemented a way to measure latency in a DataStream (I hope): I'm
> consuming a Kafka topic and I'm union'ing the resulting stream with a
> custom source that emits a (machine-local) timestamp every 1000ms (using
> currentTimeMillis). On the consuming end I'm distinguishing between the
> Kafka events and the timestamps. When encountering a timestamp, I take the
> difference of the processing machine's local time and the timestamp found
> in the stream, expecting a positive difference (with the processing
> machine's timestamp being larger than the timestamp found in the stream).
> However, the opposite is the case. Now I am wondering about when events are
> actually processed.
>
> Union the Stream from Kafka+my custom source, batching them in 10s windows
> (which is what I do), I expect 10 timestamps with ascending values and a
> rough gap of 1000ms in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/RunBenchWithInit.scala#L68
>
> On the receiving end I again take the currentTimeMillis in my fold
> function, expecting the resulting value to be larger (most of the time)
> than the timestamps encountered in the stream:
>
> https://github.com/robert-schmidtke/HiBench/blob/flink-streaming/src/streambench/flinkbench/src/main/scala/com/intel/hibench/streambench/flink/microbench/NumericCalcJob.scala#L53
>
> The system clocks are in sync up to 1ms.
>
> Maybe I am not clear about when certain timestamps are created (i.e. when
> the UDFs are invoked) or how windows are processed. Any advice is greatly
> appreciated, also alternative approaches to calculating latency.
>
> I'm on Flink 0.10.2 by the way.
>
> Thanks in advance for the help!
>
> Robert
>
> --
> My GPG Key ID: 336E2680
>