You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Mohil Khare <mo...@prosimo.io> on 2020/08/05 19:59:29 UTC
Intermittent Slowness with kafkaIO read
Hello,
I am using Beam java Sdk 2.19 on dataflow. We have a system where log
shipper continuously emit logs to kafka and beam read logs using KafkaIO.
Sometime I am seeing slowness on kafkaIO read with one of the topics
(probably during peak traffic period), where there is a 2-3 minutes between
record timestamp and time when the beam reads the log. For instance:
2020-08-05 12:46:23.826 PDT
{"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
“data” : data}, offset: 2148857. timestamp: 1596656685005
If you convert record timestamp (1596656685005) which is in epoch ms to
PDT, you will see approx 2 mins difference between this and 2020-08-05
12:46:23.826 PDT (time when beam actually reads the data).
So One way of achieving horizontal scaling here is by increasing the number
of partitions on kafka broker. What can be done on the beam side i.e.
kafkaIO side to tackle this slowness ? Any suggestions?
Thanks and regards
Mohil
Re: Intermittent Slowness with kafkaIO read
Posted by Mohil Khare <mo...@prosimo.io>.
I also have a question that if wrong windowing is messing up the received
timestamp ??
Thanks and regards
Mohil
On Wed, Aug 5, 2020 at 1:19 PM Mohil Khare <mo...@prosimo.io> wrote:
> Just to let you know, this is how I setup kafkaIO read:
>
> p
>
> .apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
>
> .withBootstrapServers(servers)
>
> .withTopics(Arrays.asList(“topic1”, “topic2”))
>
> .withKeyDeserializer(StringDeserializer.class)
>
> .withValueDeserializer(ByteArrayDeserializer.class)
>
> .withConsumerConfigUpdates(kafkaConsumerProperties)
>
> .withConsumerFactoryFn(consumerFactoryObj)
>
> .commitOffsetsInFinalize())
>
> .apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
> byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
>
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>
> .withAllowedLateness(Duration.standardSeconds(360))
>
> .discardingFiredPanes())
>
> .apply("Convert_KafkaRecord_To_PCollection<POJO>",
>
> ParDo.of(new ParseLogs())));
>
>
>
> Where kafkaConsumerProperties is following map
>
>
> kafkaConsumerProperties.put("security.protocol", "SSL");
>
> kafkaConsumerProperties.put("auto.offset.reset", "latest");
>
> kafkaConsumerProperties.put("group.id", “consumer1”);
>
> kafkaConsumerProperties.put("default.api.timeout.ms", 180000);
>
>
> And inside consumerFactoryObj I setup ssl keystrokes
>
>
> Thanks and Regards
>
> Mohil
>
> On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <mo...@prosimo.io> wrote:
>
>> Hello,
>>
>> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
>> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>>
>> Sometime I am seeing slowness on kafkaIO read with one of the topics
>> (probably during peak traffic period), where there is a 2-3 minutes between
>> record timestamp and time when the beam reads the log. For instance:
>>
>> 2020-08-05 12:46:23.826 PDT
>>
>> {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
>> “data” : data}, offset: 2148857. timestamp: 1596656685005
>>
>>
>>
>> If you convert record timestamp (1596656685005) which is in epoch ms to
>> PDT, you will see approx 2 mins difference between this and 2020-08-05
>> 12:46:23.826 PDT (time when beam actually reads the data).
>>
>> So One way of achieving horizontal scaling here is by increasing the
>> number of partitions on kafka broker. What can be done on the beam side
>> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>>
>> Thanks and regards
>> Mohil
>>
>>
>>
>>
Re: Intermittent Slowness with kafkaIO read
Posted by Mohil Khare <mo...@prosimo.io>.
Just to let you know, this is how I setup kafkaIO read:
p
.apply("Read_From_Kafka", KafkaIO.<String, byte[]>read()
.withBootstrapServers(servers)
.withTopics(Arrays.asList(“topic1”, “topic2”))
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class)
.withConsumerConfigUpdates(kafkaConsumerProperties)
.withConsumerFactoryFn(consumerFactoryObj)
.commitOffsetsInFinalize())
.apply("Applying_Fixed_Window", Window.<KafkaRecord<String,
byte[]>>into(FixedWindows.of(Duration.standardSeconds(10)))
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.standardSeconds(360))
.discardingFiredPanes())
.apply("Convert_KafkaRecord_To_PCollection<POJO>",
ParDo.of(new ParseLogs())));
Where kafkaConsumerProperties is following map
kafkaConsumerProperties.put("security.protocol", "SSL");
kafkaConsumerProperties.put("auto.offset.reset", "latest");
kafkaConsumerProperties.put("group.id", “consumer1”);
kafkaConsumerProperties.put("default.api.timeout.ms", 180000);
And inside consumerFactoryObj I setup ssl keystrokes
Thanks and Regards
Mohil
On Wed, Aug 5, 2020 at 12:59 PM Mohil Khare <mo...@prosimo.io> wrote:
> Hello,
>
> I am using Beam java Sdk 2.19 on dataflow. We have a system where log
> shipper continuously emit logs to kafka and beam read logs using KafkaIO.
>
> Sometime I am seeing slowness on kafkaIO read with one of the topics
> (probably during peak traffic period), where there is a 2-3 minutes between
> record timestamp and time when the beam reads the log. For instance:
>
> 2020-08-05 12:46:23.826 PDT
>
> {"@timestamp":1596656684.274594,"time":"2020-08-05T19:44:44.274594282Z”,
> “data” : data}, offset: 2148857. timestamp: 1596656685005
>
>
>
> If you convert record timestamp (1596656685005) which is in epoch ms to
> PDT, you will see approx 2 mins difference between this and 2020-08-05
> 12:46:23.826 PDT (time when beam actually reads the data).
>
> So One way of achieving horizontal scaling here is by increasing the
> number of partitions on kafka broker. What can be done on the beam side
> i.e. kafkaIO side to tackle this slowness ? Any suggestions?
>
> Thanks and regards
> Mohil
>
>
>
>