You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by Alexey Romanenko <ar...@gmail.com> on 2020/03/05 18:43:59 UTC

Re: Issue with KafkaIO for list of topics

+1 to what Rahul said before - KafkaIO can’t filter out Kafka topics in runtime and you need to change your TimestampPolicy accordingly . 

Please, don’t hesitate to share with us a code of your new TimestampPolicy if you still have any questions/issues with this.

PS: I move this discussion to user@ as more related mailing list for these questions.

> On 29 Feb 2020, at 12:36, Maulik Soneji <ma...@gojek.com> wrote:
> 
> Hello Rahul,
> 
> Thanks again for the detailed explanation.
> 
> I require some guidance on what values to be set for maxDelay and previousWatermark for CustomTimestampPolicyWithLimitedDelay.
> 
> Currently, I was providing maxDelay as Duration.ZERO and previousWatermark as Optional.empty().
> With these values I see that the getWatermark function always goes to else block(code link) and always returns TIMESTAMP_MIN_VALUE.
> So with this case as well, I see that the watermark is returned as TIMESTAMP_MIN_VALUE for zero throughput topics.
> 
> Please share your observations on how to tune the Timestamp Policy.
> 
> Thanks and regards,
> Maulik
> 
> 
> On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <rahulpatwari8383@gmail.com <ma...@gmail.com>> wrote:
> Hi Maulik,
> 
> Currently, I don't think it is possible to filter topics based on whether data is being produced to the topic (or) not. 
> But, the Watermark logic can be changed to make the Pipeline work.
> 
> Since the timestamps of the records are the time when the events are pushed to Kafka, every record will have monotonically increasing timestamps except for out of order events.
> Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by default, we can assign [current_timestamp - some_delay] as default and the same can be done in getWatermark() method, in which case, even if the partition is idle, Watermark will advance.
> 
> Make sure that the timestamp of the Watermark is monotonically increasing and choose the delay carefully in order to avoid discarding out of order events.
> 
> Refer https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java> for an example.
> 
> Regards,
> Rahul
> 
> 
> On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <maulik.soneji@gojek.com <ma...@gojek.com>> wrote:
> Hi Rahul,
> 
> Thank you very much for the detailed explanation.
> 
> Since we don't know which are the topics that have zero throughputs, is there a way in which we can filter out such topics in KafkaIO?
> 
> Since KafkaIO doesn't support passing a regex to consume data from, I am getting a list of topics from kafka and passing it. 
> 
> Is there a way to filter out such topics? Also, it can happen that when the job has started the topic might have no data for a few windows and after that, it can get some data. This filter should be dynamic as well.
> 
> Please share some ideas on how we can make this work.
> 
> Community members, please share your thoughts as well on how we can achieve this.
> 
> Thanks and regards,
> Maulik
> 
> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <rahulpatwari8383@gmail.com <ma...@gmail.com>> wrote:
> Hi Maulik,
> 
> This seems like an issue with Watermark.
> According to https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240 <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240>,
> 
> If there are multiple partitions (or) multiple topics, Watermark will be calculated for each of the partition and the minimum watermark is considered as the current Watermark.
> Assuming that no message is pushed to the topic with 0 throughput, according to your logic for the watermark calculation, the watermark of each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE (the smallest representable timestamp of an element - https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44 <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44>).
> 
> As the result will be emitted from GroupByKey when the Watermark crosses the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you are not seeing the results from GroupByKey.
> 
> Regards,
> Rahul 
> 
> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <maulik.soneji@gojek.com <ma...@gojek.com>> wrote:
> Observations:
> If we read using KafkaIO for a list of topics where one of the topics has zero throughputs,
> and KafkaIO is followed by GroupByKey stage, then:
> a. No data is output from GroupByKey stage for all the topics and not just the zero throughput topic.
> 
> If all topics have some throughput coming in, then it works fine and we get some output from GroupByKey stage.
> 
> Is this an issue?
> 
> Points:
> a. The output from GroupByKey is only when all topics have some throughput
> b. This is a problem with KafkaIO + GroupByKey, for case where I have FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data even if there is no data for one of the files.
> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
> d. Even if lag is different for each topic on the list, we still get some output from GroupByKey.
> 
> Debugging:
> While Debugging this issue I found that in split function of KafkaUnboundedSource we create KafkaUnboundedSource where partition list is one partition for each topic.
> 
> I am not sure if this is some issue with watermark, since watermark for the topic with no throughput will not advance. But this looks like the most likely cause to me.
> 
> Please help me in figuring out whether this is an issue or if there is something wrong with my pipeline.
> 
> Attaching detailed pipeline information for more details:
> 
> Context:
> I am currently using KafkaIO to read data from kafka for a list of topics with a custom timestamp policy.
> 
> Below is how I am constructing KafkaIO reader:
> return KafkaIO.<byte[], byte[]>read()
>         .withBootstrapServers(brokers)
>         .withTopics(topics)
>         .withKeyDeserializer(ByteArrayDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>         .commitOffsetsInFinalize();
> Pipeline Information:
> Pipeline Consists of six steps:
> a. Read From Kafka with custom timestamp policy
> b. Convert KafkaRecord to Message object
> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Key
> e. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
> g. Write output to kafka
> 
> Detailed Pipeline Information
> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
> Here I am using my own timestamp policy which looks like below:
> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>     this.myService = myService;
>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
> }
> 
> @Override
> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>     Instant eventTimestamp;
>     try {
>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>     } catch (InvalidProtocolBufferException e) {
>         statsClient.increment("io.proto.buffer.exception");
>         throw new RuntimeException(e);
>     }
>     this.currentWatermark = eventTimestamp;
>     return this.currentWatermark;
> }
> 
> @Override
> public Instant getWatermark(PartitionContext ctx) {
>     return this.currentWatermark;
> }
> Event timestamp is one of the fields in the kafka message. It is the time when the event was pushed to kafka.
> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.
> The Message class contains properties like offset, topic, partition, offset and timestamp
> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow() 
> d. PCollection<Message> to PCollection<KV<String, Message>>
> Here Key is the kafka topic.
> 
> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
> g. Write output to kafka