You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/05 19:59:29 UTC

Intermittent Slowness with kafkaIO read

Hello,

I am using Beam java Sdk 2.19 on dataflow. We have a system where log
shipper continuously emit logs to kafka and beam read logs using KafkaIO.

Sometime I am seeing slowness on kafkaIO read with one of the topics
(probably during peak traffic period), where there is a 2-3 minutes between
record timestamp and time when the beam reads the log. For instance:

2020-08-05 12:46:23.826 PDT

 {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
“data” : data}, offset: 2148857. timestamp: 1596656685005



If you convert record timestamp (1596656685005) which is in epoch ms to
PDT, you will see approx 2 mins difference between  this and 2020-08-05
12:46:23.826 PDT (time when beam actually reads the data).

So One way of achieving horizontal scaling here is by increasing the number
of partitions on kafka broker. What can be done on the beam side i.e.
kafkaIO side to tackle this slowness ? Any suggestions?

Thanks and regards
Mohil

Re: Intermittent Slowness with kafkaIO read

Posted by Mohil Khare <mo...@prosimo.io>.
I also have a question that if wrong windowing is messing up the received
timestamp ??

Thanks and regards
Mohil

On Wed, Aug 5, 2020 at 1:19 PM Mohil Khare <mo...@prosimo.io> wrote:

> Just to let you know, this is how I setup kafkaIO read:
>
> p
>
>     .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
>         .withBootstrapServers(servers)
>
>         .withTopics(Arrays.asList(“topic1”, “topic2”))
>
>         .withKeyDeserializer(StringDeserializer.class)
>
>         .withValueDeserializer(ByteArrayDeserializer.class)
>
>         .withConsumerConfigUpdates(kafkaConsumerProperties)
>
>         .withConsumerFactoryFn(consumerFactoryObj)
>
>         .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
>     .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
>     .withAllowedLateness(Duration.standardSeconds(360))
>
>     .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
>     ParDo.of(new ParseLogs())));
>
>
>
> Where kafkaConsumerProperties is following map
>
>
> kafkaConsumerProperties.put("security.protocol", "SSL");
>
> kafkaConsumerProperties.put("auto.offset.reset", "latest");
>
> kafkaConsumerProperties.put("group.id", “consumer1”);
>
> kafkaConsumerProperties.put("default.api.timeout.ms", 180000);
>
>
> And inside consumerFactoryObj I setup ssl keystrokes
>
>
> Thanks and Regards
>
> Mohil
>
> On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello,
>>
>> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
>> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>>
>> Sometime I am seeing slowness on kafkaIO read with one of the topics
>> (probably during peak traffic period), where there is a 2-3 minutes between
>> record timestamp and time when the beam reads the log. For instance:
>>
>> 2020-08-05 12:46:23.826 PDT
>>
>>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
>> “data” : data}, offset: 2148857. timestamp: 1596656685005
>>
>>
>>
>> If you convert record timestamp (1596656685005) which is in epoch ms to
>> PDT, you will see approx 2 mins difference between  this and 2020-08-05
>> 12:46:23.826 PDT (time when beam actually reads the data).
>>
>> So One way of achieving horizontal scaling here is by increasing the
>> number of partitions on kafka broker. What can be done on the beam side
>> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>

Re: Intermittent Slowness with kafkaIO read

Posted by Mohil Khare <mo...@prosimo.io>.
Just to let you know, this is how I setup kafkaIO read:

p

    .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()

        .withBootstrapServers(servers)

        .withTopics(Arrays.asList(“topic1”, “topic2”))

        .withKeyDeserializer(StringDeserializer.class)

        .withValueDeserializer(ByteArrayDeserializer.class)

        .withConsumerConfigUpdates(kafkaConsumerProperties)

        .withConsumerFactoryFn(consumerFactoryObj)

        .commitOffsetsInFinalize())

.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))

    .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))

    .withAllowedLateness(Duration.standardSeconds(360))

    .discardingFiredPanes())

.apply("Convert_KafkaRecord_To_PCollection<POJO>",

    ParDo.of(new ParseLogs())));



Where kafkaConsumerProperties is following map


kafkaConsumerProperties.put("security.protocol", "SSL");

kafkaConsumerProperties.put("auto.offset.reset", "latest");

kafkaConsumerProperties.put("group.id", “consumer1”);

kafkaConsumerProperties.put("default.api.timeout.ms", 180000);


And inside consumerFactoryObj I setup ssl keystrokes


Thanks and Regards

Mohil

On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <mo...@prosimo.io> wrote:

> Hello,
>
> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>
> Sometime I am seeing slowness on kafkaIO read with one of the topics
> (probably during peak traffic period), where there is a 2-3 minutes between
> record timestamp and time when the beam reads the log. For instance:
>
> 2020-08-05 12:46:23.826 PDT
>
>  {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
> “data” : data}, offset: 2148857. timestamp: 1596656685005
>
>
>
> If you convert record timestamp (1596656685005) which is in epoch ms to
> PDT, you will see approx 2 mins difference between  this and 2020-08-05
> 12:46:23.826 PDT (time when beam actually reads the data).
>
> So One way of achieving horizontal scaling here is by increasing the
> number of partitions on kafka broker. What can be done on the beam side
> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>
> Thanks and regards
> Mohil
>
>
>
>