You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Talat Uyarer <tu...@paloaltonetworks.com> on 2020/06/09 21:05:27 UTC

KafkaIO Read Latency

Hi,
I added some metrics on a step right after KafkaIO. When I compare the read
time difference between producer and KafkaIO it is 800ms for P99. However
somehow that step's opening and closing bundle difference is 18 seconds for
p99. The step itself does not do any specific thing. Do you have any idea
why bundle latency is very high ? Where should I check or tune on KafkaIO ?

Additional information I read from one topic. That topic has 15 partitions.
Producer write in a round robin fashion.

Thanks

Re: KafkaIO Read Latency

Posted by Alexey Romanenko <ar...@gmail.com>.
Thank you for details. 

I expect that StartBundle and FinishBundle methods are called in your DoFn (KafkaMessageExtractor). Since it’s called before your Window transform, I don’t think it should affect it.

Did you count how many records are processed by one bundle? 
What is actually happening inside your @ProcessElement method? 

It would be helpful to share a code of the class where you do your measurement, if it’s possible. 


> On 10 Jun 2020, at 18:39, Talat Uyarer <tu...@paloaltonetworks.com> wrote:
> 
> Hi,
> 
> I check the time when StartBundle is called and do the same thing for FinishBundle then take the difference between Start and Finish Bundle times and report bundle latency. I put this metric on a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont know if this is related, My pipeline has a Windowing function and GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400. My Current traffic is 8kps. I changed the window duration 5 seconds and 20 seconds. But it does not affect much.
> 
> KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink
> .apply(Window.<KV<String, byte[]>>into(
>     FixedWindows.of(Duration.standardSeconds(windowDurationSeconds)))
>     .triggering(Repeatedly.forever(AfterFirst.of(
>         AfterPane.elementCountAtLeast((int) batchSize),
>         AfterProcessingTime.pastFirstElementInPane()
>             .plusDelayOf(Duration.standardSeconds(windowDurationSeconds))))
>     )
>     .withAllowedLateness(Duration.ZERO)
>     .discardingFiredPanes())
> .apply(GroupIntoBatches.ofSize(batchSize))
> 
> Thanks
> 
> On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko <aromanenko.dev@gmail.com <ma...@gmail.com>> wrote:
> Hi Talat,
> 
> Could you elaborate what do you mean by “opening and closing bundle”?
> 
> Sometimes, starting a KafkaReader can take time since it will seek for a start offset for each assigned partition but it happens only once at pipeline start-up and mostly depends on network conditions.
> 
>> On 9 Jun 2020, at 23:05, Talat Uyarer <tuyarer@paloaltonetworks.com <ma...@paloaltonetworks.com>> wrote:
>> 
>> Hi,
>> I added some metrics on a step right after KafkaIO. When I compare the read time difference between producer and KafkaIO it is 800ms for P99. However somehow that step's opening and closing bundle difference is 18 seconds for p99. The step itself does not do any specific thing. Do you have any idea why bundle latency is very high ? Where should I check or tune on KafkaIO ?
>> 
>> Additional information I read from one topic. That topic has 15 partitions. Producer write in a round robin fashion. 
>> 
>> Thanks
> 


Re: KafkaIO Read Latency

Posted by Talat Uyarer <tu...@paloaltonetworks.com>.
Hi,

I check the time when StartBundle is called and do the same thing for
FinishBundle then take the difference between Start and Finish Bundle times
and report bundle latency. I put this metric on
a step(KafkaMessageExtractor) which is right after the KafkaIO step. I dont
know if this is related, My pipeline has a Windowing function and
GroupIntoBatches. Windowing duration is 10 seconds and batch size is 400.
My Current traffic is 8kps. I changed the window duration 5 seconds and 20
seconds. But it does not affect much.

KafkaIO -> KafkaMessageExtractor -> Windowing Function -> Sink

.apply(Window.<KV<String, byte[]>>into(
    FixedWindows.of(Duration.standardSeconds(windowDurationSeconds)))
    .triggering(Repeatedly.forever(AfterFirst.of(
        AfterPane.elementCountAtLeast((int) batchSize),
        AfterProcessingTime.pastFirstElementInPane()
            .plusDelayOf(Duration.standardSeconds(windowDurationSeconds))))
    )
    .withAllowedLateness(Duration.ZERO)
    .discardingFiredPanes())
.apply(GroupIntoBatches.ofSize(batchSize))


Thanks


On Wed, Jun 10, 2020 at 8:37 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> Hi Talat,
>
> Could you elaborate what do you mean by “*opening and closing bundle*”?
>
> Sometimes, starting a KafkaReader can take time since it will seek for a
> start offset for each assigned partition but it happens only once at
> pipeline start-up and mostly depends on network conditions.
>
> On 9 Jun 2020, at 23:05, Talat Uyarer <tu...@paloaltonetworks.com>
> wrote:
>
> Hi,
> I added some metrics on a step right after KafkaIO. When I compare the
> read time difference between producer and KafkaIO it is 800ms for P99.
> However somehow that step's opening and closing bundle difference is 18
> seconds for p99. The step itself does not do any specific thing. Do you
> have any idea why bundle latency is very high ? Where should I check or
> tune on KafkaIO ?
>
> Additional information I read from one topic. That topic has 15
> partitions. Producer write in a round robin fashion.
>
> Thanks
>
>
>

Re: KafkaIO Read Latency

Posted by Alexey Romanenko <ar...@gmail.com>.
Hi Talat,

Could you elaborate what do you mean by “opening and closing bundle”?

Sometimes, starting a KafkaReader can take time since it will seek for a start offset for each assigned partition but it happens only once at pipeline start-up and mostly depends on network conditions.

> On 9 Jun 2020, at 23:05, Talat Uyarer <tu...@paloaltonetworks.com> wrote:
> 
> Hi,
> I added some metrics on a step right after KafkaIO. When I compare the read time difference between producer and KafkaIO it is 800ms for P99. However somehow that step's opening and closing bundle difference is 18 seconds for p99. The step itself does not do any specific thing. Do you have any idea why bundle latency is very high ? Where should I check or tune on KafkaIO ?
> 
> Additional information I read from one topic. That topic has 15 partitions. Producer write in a round robin fashion. 
> 
> Thanks