You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Maulik Soneji <ma...@gojek.com> on 2020/02/28 07:08:24 UTC

Issue with KafkaIO for list of topics

*Observations:*
If we read using KafkaIO for a list of topics where one of the topics has
zero throughputs,
and KafkaIO is followed by GroupByKey stage, then:
a. No data is output from GroupByKey stage for all the topics and not just
the zero throughput topic.

If all topics have some throughput coming in, then it works fine and we get
some output from GroupByKey stage.

Is this an issue?

*Points:*
a. The output from GroupByKey is only when all topics have some throughput
b. This is a problem with KafkaIO + GroupByKey, for case where I have
FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
even if there is no data for one of the files.
c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
d. Even if lag is different for each topic on the list, we still get some
output from GroupByKey.


*Debugging:*While Debugging this issue I found that in split function of
KafkaUnboundedSource we create KafkaUnboundedSource where partition list is
one partition for each topic.

I am not sure if this is some issue with watermark, since watermark for the
topic with no throughput will not advance. But this looks like the most
likely cause to me.

*Please help me in figuring out whether this is an issue or if there is
something wrong with my pipeline.*

Attaching detailed pipeline information for more details:

*Context:*
I am currently using KafkaIO to read data from kafka for a list of topics
with a custom timestamp policy.

Below is how I am constructing KafkaIO reader:

return KafkaIO.<byte[], byte[]>read()
        .withBootstrapServers(brokers)
        .withTopics(topics)
        .withKeyDeserializer(ByteArrayDeserializer.class)
        .withValueDeserializer(ByteArrayDeserializer.class)
        .withTimestampPolicyFactory((partition, previousWatermark) ->
new EventTimestampPolicy(godataService, previousWatermark))
        .commitOffsetsInFinalize();

*Pipeline Information:
*Pipeline Consists of six steps:
a. Read From Kafka with custom timestamp policy
b. Convert KafkaRecord to Message object
c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
d. PCollection<Message> to PCollection<KV<String, Message>> where
Topic is Keye. GroupByKey.create() to get PCollection<KV<String,
Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to
PCollection<ComputedMetrics> for each topicg. Write output to kafka

*Detailed Pipeline Information*
a. Read data from kafka to get KafkaRecord<byte[], byte[]>
Here I am using my own timestamp policy which looks like below:

public EventTimestampPolicy(MyService myService, Optional<Instant>
previousWatermark) {
    this.myService = myService;
    this.currentWatermark =
previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
}

@Override
public Instant getTimestampForRecord(PartitionContext context,
KafkaRecord<byte[], byte[]> record) {
    Instant eventTimestamp;
    try {
        eventTimestamp = Deserializer.getEventTimestamp(record, myService);
    } catch (InvalidProtocolBufferException e) {
        statsClient.increment("io.proto.buffer.exception");
        throw new RuntimeException(e);
    }
    this.currentWatermark = eventTimestamp;
    return this.currentWatermark;
}

@Override
public Instant getWatermark(PartitionContext ctx) {
    return this.currentWatermark;
}

Event timestamp is one of the fields in the kafka message. It is the time
when the event was pushed to kafka.

b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The
Message class contains properties like offset, topic, partition,
offset and timestamp

c. Windowing on 10 minute fixed window triggering at
AfterWatermark.pastEndOfWindow()

d. PCollection<Message> to PCollection<KV<String, Message>>
Here Key is the kafka topic.

e. GroupByKey to get PCollection<KV<String, Iterable<Message>>

f. PCollection<KV<String, Iterable<Message>> to
PCollection<ComputedMetrics> for each topic

g. Write output to kafka

Re: Issue with KafkaIO for list of topics

Posted by Alexey Romanenko <ar...@gmail.com>.
+1 to what Rahul said before - KafkaIO can’t filter out Kafka topics in runtime and you need to change your TimestampPolicy accordingly . 

Please, don’t hesitate to share with us a code of your new TimestampPolicy if you still have any questions/issues with this.

PS: I move this discussion to user@ as more related mailing list for these questions.

> On 29 Feb 2020, at 12:36, Maulik Soneji <ma...@gojek.com> wrote:
> 
> Hello Rahul,
> 
> Thanks again for the detailed explanation.
> 
> I require some guidance on what values to be set for maxDelay and previousWatermark for CustomTimestampPolicyWithLimitedDelay.
> 
> Currently, I was providing maxDelay as Duration.ZERO and previousWatermark as Optional.empty().
> With these values I see that the getWatermark function always goes to else block(code link) and always returns TIMESTAMP_MIN_VALUE.
> So with this case as well, I see that the watermark is returned as TIMESTAMP_MIN_VALUE for zero throughput topics.
> 
> Please share your observations on how to tune the Timestamp Policy.
> 
> Thanks and regards,
> Maulik
> 
> 
> On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <rahulpatwari8383@gmail.com <ma...@gmail.com>> wrote:
> Hi Maulik,
> 
> Currently, I don't think it is possible to filter topics based on whether data is being produced to the topic (or) not. 
> But, the Watermark logic can be changed to make the Pipeline work.
> 
> Since the timestamps of the records are the time when the events are pushed to Kafka, every record will have monotonically increasing timestamps except for out of order events.
> Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by default, we can assign [current_timestamp - some_delay] as default and the same can be done in getWatermark() method, in which case, even if the partition is idle, Watermark will advance.
> 
> Make sure that the timestamp of the Watermark is monotonically increasing and choose the delay carefully in order to avoid discarding out of order events.
> 
> Refer https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java> for an example.
> 
> Regards,
> Rahul
> 
> 
> On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <maulik.soneji@gojek.com <ma...@gojek.com>> wrote:
> Hi Rahul,
> 
> Thank you very much for the detailed explanation.
> 
> Since we don't know which are the topics that have zero throughputs, is there a way in which we can filter out such topics in KafkaIO?
> 
> Since KafkaIO doesn't support passing a regex to consume data from, I am getting a list of topics from kafka and passing it. 
> 
> Is there a way to filter out such topics? Also, it can happen that when the job has started the topic might have no data for a few windows and after that, it can get some data. This filter should be dynamic as well.
> 
> Please share some ideas on how we can make this work.
> 
> Community members, please share your thoughts as well on how we can achieve this.
> 
> Thanks and regards,
> Maulik
> 
> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <rahulpatwari8383@gmail.com <ma...@gmail.com>> wrote:
> Hi Maulik,
> 
> This seems like an issue with Watermark.
> According to https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240 <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240>,
> 
> If there are multiple partitions (or) multiple topics, Watermark will be calculated for each of the partition and the minimum watermark is considered as the current Watermark.
> Assuming that no message is pushed to the topic with 0 throughput, according to your logic for the watermark calculation, the watermark of each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE (the smallest representable timestamp of an element - https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44 <https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44>).
> 
> As the result will be emitted from GroupByKey when the Watermark crosses the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you are not seeing the results from GroupByKey.
> 
> Regards,
> Rahul 
> 
> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <maulik.soneji@gojek.com <ma...@gojek.com>> wrote:
> Observations:
> If we read using KafkaIO for a list of topics where one of the topics has zero throughputs,
> and KafkaIO is followed by GroupByKey stage, then:
> a. No data is output from GroupByKey stage for all the topics and not just the zero throughput topic.
> 
> If all topics have some throughput coming in, then it works fine and we get some output from GroupByKey stage.
> 
> Is this an issue?
> 
> Points:
> a. The output from GroupByKey is only when all topics have some throughput
> b. This is a problem with KafkaIO + GroupByKey, for case where I have FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data even if there is no data for one of the files.
> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
> d. Even if lag is different for each topic on the list, we still get some output from GroupByKey.
> 
> Debugging:
> While Debugging this issue I found that in split function of KafkaUnboundedSource we create KafkaUnboundedSource where partition list is one partition for each topic.
> 
> I am not sure if this is some issue with watermark, since watermark for the topic with no throughput will not advance. But this looks like the most likely cause to me.
> 
> Please help me in figuring out whether this is an issue or if there is something wrong with my pipeline.
> 
> Attaching detailed pipeline information for more details:
> 
> Context:
> I am currently using KafkaIO to read data from kafka for a list of topics with a custom timestamp policy.
> 
> Below is how I am constructing KafkaIO reader:
> return KafkaIO.<byte[], byte[]>read()
>         .withBootstrapServers(brokers)
>         .withTopics(topics)
>         .withKeyDeserializer(ByteArrayDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>         .commitOffsetsInFinalize();
> Pipeline Information:
> Pipeline Consists of six steps:
> a. Read From Kafka with custom timestamp policy
> b. Convert KafkaRecord to Message object
> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Key
> e. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
> g. Write output to kafka
> 
> Detailed Pipeline Information
> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
> Here I am using my own timestamp policy which looks like below:
> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>     this.myService = myService;
>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
> }
> 
> @Override
> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>     Instant eventTimestamp;
>     try {
>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>     } catch (InvalidProtocolBufferException e) {
>         statsClient.increment("io.proto.buffer.exception");
>         throw new RuntimeException(e);
>     }
>     this.currentWatermark = eventTimestamp;
>     return this.currentWatermark;
> }
> 
> @Override
> public Instant getWatermark(PartitionContext ctx) {
>     return this.currentWatermark;
> }
> Event timestamp is one of the fields in the kafka message. It is the time when the event was pushed to kafka.
> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.
> The Message class contains properties like offset, topic, partition, offset and timestamp
> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow() 
> d. PCollection<Message> to PCollection<KV<String, Message>>
> Here Key is the kafka topic.
> 
> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
> g. Write output to kafka


Re: Issue with KafkaIO for list of topics

Posted by Maulik Soneji <ma...@gojek.com>.
Hello Rahul,

Thanks again for the detailed explanation.

I require some guidance on what values to be set for maxDelay and
previousWatermark for CustomTimestampPolicyWithLimitedDelay.

Currently, I was providing maxDelay as Duration.ZERO and previousWatermark
as Optional.empty().
With these values I see that the getWatermark function always goes to else
block(code link) and always returns TIMESTAMP_MIN_VALUE.
So with this case as well, I see that the watermark is returned as
TIMESTAMP_MIN_VALUE for zero throughput topics.

Please share your observations on how to tune the Timestamp Policy.

Thanks and regards,
Maulik


On Fri, Feb 28, 2020 at 8:46 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi Maulik,
>
> Currently, I don't think it is possible to filter topics based on whether
> data is being produced to the topic (or) not.
> But, the Watermark logic can be changed to make the Pipeline work.
>
> Since the timestamps of the records are the time when the events are
> pushed to Kafka, every record will have monotonically increasing timestamps
> except for out of order events.
> Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE
> by default, we can assign [current_timestamp - some_delay] as default and
> the same can be done in getWatermark() method, in which case, even if the
> partition is idle, Watermark will advance.
>
> Make sure that the timestamp of the Watermark is monotonically increasing
> and choose the delay carefully in order to avoid discarding out of order
> events.
>
> Refer
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
> for an example.
>
> Regards,
> Rahul
>
>
> On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <ma...@gojek.com>
> wrote:
>
>> Hi Rahul,
>>
>> Thank you very much for the detailed explanation.
>>
>> Since we don't know which are the topics that have zero throughputs, is
>> there a way in which we can filter out such topics in KafkaIO?
>>
>> Since KafkaIO doesn't support passing a regex to consume data from, I am
>> getting a list of topics from kafka and passing it.
>>
>> Is there a way to filter out such topics? Also, it can happen that when
>> the job has started the topic might have no data for a few windows and
>> after that, it can get some data. This filter should be dynamic as well.
>>
>> Please share some ideas on how we can make this work.
>>
>> Community members, please share your thoughts as well on how we can
>> achieve this.
>>
>> Thanks and regards,
>> Maulik
>>
>> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi Maulik,
>>>
>>> This seems like an issue with Watermark.
>>> According to
>>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
>>> ,
>>>
>>> If there are multiple partitions (or) multiple topics, Watermark will be
>>> calculated for each of the partition and the minimum watermark is
>>> considered as the current Watermark.
>>> Assuming that no message is pushed to the topic with 0 throughput,
>>> according to your logic for the watermark calculation, the watermark of
>>> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
>>> (the smallest representable timestamp of an element -
>>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
>>> ).
>>>
>>> As the result will be emitted from GroupByKey when the Watermark crosses
>>> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE,
>>> you are not seeing the results from GroupByKey.
>>>
>>> Regards,
>>> Rahul
>>>
>>> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <ma...@gojek.com>
>>> wrote:
>>>
>>>> *Observations:*
>>>> If we read using KafkaIO for a list of topics where one of the topics
>>>> has zero throughputs,
>>>> and KafkaIO is followed by GroupByKey stage, then:
>>>> a. No data is output from GroupByKey stage for all the topics and not
>>>> just the zero throughput topic.
>>>>
>>>> If all topics have some throughput coming in, then it works fine and we
>>>> get some output from GroupByKey stage.
>>>>
>>>> Is this an issue?
>>>>
>>>> *Points:*
>>>> a. The output from GroupByKey is only when all topics have some
>>>> throughput
>>>> b. This is a problem with KafkaIO + GroupByKey, for case where I have
>>>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
>>>> even if there is no data for one of the files.
>>>> c. Not a runner issue, since I ran it with FlinkRunner and
>>>> DataflowRunner
>>>> d. Even if lag is different for each topic on the list, we still get
>>>> some output from GroupByKey.
>>>>
>>>>
>>>> *Debugging:*While Debugging this issue I found that in split function
>>>> of KafkaUnboundedSource we create KafkaUnboundedSource where partition list
>>>> is one partition for each topic.
>>>>
>>>> I am not sure if this is some issue with watermark, since watermark for
>>>> the topic with no throughput will not advance. But this looks like the most
>>>> likely cause to me.
>>>>
>>>> *Please help me in figuring out whether this is an issue or if there is
>>>> something wrong with my pipeline.*
>>>>
>>>> Attaching detailed pipeline information for more details:
>>>>
>>>> *Context:*
>>>> I am currently using KafkaIO to read data from kafka for a list of
>>>> topics with a custom timestamp policy.
>>>>
>>>> Below is how I am constructing KafkaIO reader:
>>>>
>>>> return KafkaIO.<byte[], byte[]>read()
>>>>         .withBootstrapServers(brokers)
>>>>         .withTopics(topics)
>>>>         .withKeyDeserializer(ByteArrayDeserializer.class)
>>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>>>>         .commitOffsetsInFinalize();
>>>>
>>>> *Pipeline Information:
>>>> *Pipeline Consists of six steps:
>>>> a. Read From Kafka with custom timestamp policy
>>>> b. Convert KafkaRecord to Message object
>>>> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
>>>> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Keye. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topicg. Write output to kafka
>>>>
>>>> *Detailed Pipeline Information*
>>>> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
>>>> Here I am using my own timestamp policy which looks like below:
>>>>
>>>> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>>>>     this.myService = myService;
>>>>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
>>>> }
>>>>
>>>> @Override
>>>> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>>>>     Instant eventTimestamp;
>>>>     try {
>>>>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>>>>     } catch (InvalidProtocolBufferException e) {
>>>>         statsClient.increment("io.proto.buffer.exception");
>>>>         throw new RuntimeException(e);
>>>>     }
>>>>     this.currentWatermark = eventTimestamp;
>>>>     return this.currentWatermark;
>>>> }
>>>>
>>>> @Override
>>>> public Instant getWatermark(PartitionContext ctx) {
>>>>     return this.currentWatermark;
>>>> }
>>>>
>>>> Event timestamp is one of the fields in the kafka message. It is the
>>>> time when the event was pushed to kafka.
>>>>
>>>> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The Message class contains properties like offset, topic, partition, offset and timestamp
>>>>
>>>> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow()
>>>>
>>>> d. PCollection<Message> to PCollection<KV<String, Message>>
>>>> Here Key is the kafka topic.
>>>>
>>>> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
>>>>
>>>> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
>>>>
>>>> g. Write output to kafka
>>>>
>>>>

Re: Issue with KafkaIO for list of topics

Posted by rahul patwari <ra...@gmail.com>.
Hi Maulik,

Currently, I don't think it is possible to filter topics based on whether
data is being produced to the topic (or) not.
But, the Watermark logic can be changed to make the Pipeline work.

Since the timestamps of the records are the time when the events are pushed
to Kafka, every record will have monotonically increasing timestamps except
for out of order events.
Instead of assigning the Watermark as BoundedWindow.TIMESTAMP_MIN_VALUE by
default, we can assign [current_timestamp - some_delay] as default and the
same can be done in getWatermark() method, in which case, even if the
partition is idle, Watermark will advance.

Make sure that the timestamp of the Watermark is monotonically increasing
and choose the delay carefully in order to avoid discarding out of order
events.

Refer
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/CustomTimestampPolicyWithLimitedDelay.java
for an example.

Regards,
Rahul


On Fri, Feb 28, 2020 at 6:54 PM Maulik Soneji <ma...@gojek.com>
wrote:

> Hi Rahul,
>
> Thank you very much for the detailed explanation.
>
> Since we don't know which are the topics that have zero throughputs, is
> there a way in which we can filter out such topics in KafkaIO?
>
> Since KafkaIO doesn't support passing a regex to consume data from, I am
> getting a list of topics from kafka and passing it.
>
> Is there a way to filter out such topics? Also, it can happen that when
> the job has started the topic might have no data for a few windows and
> after that, it can get some data. This filter should be dynamic as well.
>
> Please share some ideas on how we can make this work.
>
> Community members, please share your thoughts as well on how we can
> achieve this.
>
> Thanks and regards,
> Maulik
>
> On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi Maulik,
>>
>> This seems like an issue with Watermark.
>> According to
>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
>> ,
>>
>> If there are multiple partitions (or) multiple topics, Watermark will be
>> calculated for each of the partition and the minimum watermark is
>> considered as the current Watermark.
>> Assuming that no message is pushed to the topic with 0 throughput,
>> according to your logic for the watermark calculation, the watermark of
>> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
>> (the smallest representable timestamp of an element -
>> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
>> ).
>>
>> As the result will be emitted from GroupByKey when the Watermark crosses
>> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE,
>> you are not seeing the results from GroupByKey.
>>
>> Regards,
>> Rahul
>>
>> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <ma...@gojek.com>
>> wrote:
>>
>>> *Observations:*
>>> If we read using KafkaIO for a list of topics where one of the topics
>>> has zero throughputs,
>>> and KafkaIO is followed by GroupByKey stage, then:
>>> a. No data is output from GroupByKey stage for all the topics and not
>>> just the zero throughput topic.
>>>
>>> If all topics have some throughput coming in, then it works fine and we
>>> get some output from GroupByKey stage.
>>>
>>> Is this an issue?
>>>
>>> *Points:*
>>> a. The output from GroupByKey is only when all topics have some
>>> throughput
>>> b. This is a problem with KafkaIO + GroupByKey, for case where I have
>>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
>>> even if there is no data for one of the files.
>>> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
>>> d. Even if lag is different for each topic on the list, we still get
>>> some output from GroupByKey.
>>>
>>>
>>> *Debugging:*While Debugging this issue I found that in split function
>>> of KafkaUnboundedSource we create KafkaUnboundedSource where partition list
>>> is one partition for each topic.
>>>
>>> I am not sure if this is some issue with watermark, since watermark for
>>> the topic with no throughput will not advance. But this looks like the most
>>> likely cause to me.
>>>
>>> *Please help me in figuring out whether this is an issue or if there is
>>> something wrong with my pipeline.*
>>>
>>> Attaching detailed pipeline information for more details:
>>>
>>> *Context:*
>>> I am currently using KafkaIO to read data from kafka for a list of
>>> topics with a custom timestamp policy.
>>>
>>> Below is how I am constructing KafkaIO reader:
>>>
>>> return KafkaIO.<byte[], byte[]>read()
>>>         .withBootstrapServers(brokers)
>>>         .withTopics(topics)
>>>         .withKeyDeserializer(ByteArrayDeserializer.class)
>>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>>>         .commitOffsetsInFinalize();
>>>
>>> *Pipeline Information:
>>> *Pipeline Consists of six steps:
>>> a. Read From Kafka with custom timestamp policy
>>> b. Convert KafkaRecord to Message object
>>> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
>>> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Keye. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topicg. Write output to kafka
>>>
>>> *Detailed Pipeline Information*
>>> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
>>> Here I am using my own timestamp policy which looks like below:
>>>
>>> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>>>     this.myService = myService;
>>>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
>>> }
>>>
>>> @Override
>>> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>>>     Instant eventTimestamp;
>>>     try {
>>>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>>>     } catch (InvalidProtocolBufferException e) {
>>>         statsClient.increment("io.proto.buffer.exception");
>>>         throw new RuntimeException(e);
>>>     }
>>>     this.currentWatermark = eventTimestamp;
>>>     return this.currentWatermark;
>>> }
>>>
>>> @Override
>>> public Instant getWatermark(PartitionContext ctx) {
>>>     return this.currentWatermark;
>>> }
>>>
>>> Event timestamp is one of the fields in the kafka message. It is the
>>> time when the event was pushed to kafka.
>>>
>>> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The Message class contains properties like offset, topic, partition, offset and timestamp
>>>
>>> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow()
>>>
>>> d. PCollection<Message> to PCollection<KV<String, Message>>
>>> Here Key is the kafka topic.
>>>
>>> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
>>>
>>> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
>>>
>>> g. Write output to kafka
>>>
>>>

Re: Issue with KafkaIO for list of topics

Posted by Maulik Soneji <ma...@gojek.com>.
Hi Rahul,

Thank you very much for the detailed explanation.

Since we don't know which are the topics that have zero throughputs, is
there a way in which we can filter out such topics in KafkaIO?

Since KafkaIO doesn't support passing a regex to consume data from, I am
getting a list of topics from kafka and passing it.

Is there a way to filter out such topics? Also, it can happen that when the
job has started the topic might have no data for a few windows and after
that, it can get some data. This filter should be dynamic as well.

Please share some ideas on how we can make this work.

Community members, please share your thoughts as well on how we can achieve
this.

Thanks and regards,
Maulik

On Fri, Feb 28, 2020 at 3:03 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi Maulik,
>
> This seems like an issue with Watermark.
> According to
> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
> ,
>
> If there are multiple partitions (or) multiple topics, Watermark will be
> calculated for each of the partition and the minimum watermark is
> considered as the current Watermark.
> Assuming that no message is pushed to the topic with 0 throughput,
> according to your logic for the watermark calculation, the watermark of
> each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
> (the smallest representable timestamp of an element -
> https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
> ).
>
> As the result will be emitted from GroupByKey when the Watermark crosses
> the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you
> are not seeing the results from GroupByKey.
>
> Regards,
> Rahul
>
> On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <ma...@gojek.com>
> wrote:
>
>> *Observations:*
>> If we read using KafkaIO for a list of topics where one of the topics has
>> zero throughputs,
>> and KafkaIO is followed by GroupByKey stage, then:
>> a. No data is output from GroupByKey stage for all the topics and not
>> just the zero throughput topic.
>>
>> If all topics have some throughput coming in, then it works fine and we
>> get some output from GroupByKey stage.
>>
>> Is this an issue?
>>
>> *Points:*
>> a. The output from GroupByKey is only when all topics have some throughput
>> b. This is a problem with KafkaIO + GroupByKey, for case where I have
>> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
>> even if there is no data for one of the files.
>> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
>> d. Even if lag is different for each topic on the list, we still get some
>> output from GroupByKey.
>>
>>
>> *Debugging:*While Debugging this issue I found that in split function of
>> KafkaUnboundedSource we create KafkaUnboundedSource where partition list is
>> one partition for each topic.
>>
>> I am not sure if this is some issue with watermark, since watermark for
>> the topic with no throughput will not advance. But this looks like the most
>> likely cause to me.
>>
>> *Please help me in figuring out whether this is an issue or if there is
>> something wrong with my pipeline.*
>>
>> Attaching detailed pipeline information for more details:
>>
>> *Context:*
>> I am currently using KafkaIO to read data from kafka for a list of topics
>> with a custom timestamp policy.
>>
>> Below is how I am constructing KafkaIO reader:
>>
>> return KafkaIO.<byte[], byte[]>read()
>>         .withBootstrapServers(brokers)
>>         .withTopics(topics)
>>         .withKeyDeserializer(ByteArrayDeserializer.class)
>>         .withValueDeserializer(ByteArrayDeserializer.class)
>>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>>         .commitOffsetsInFinalize();
>>
>> *Pipeline Information:
>> *Pipeline Consists of six steps:
>> a. Read From Kafka with custom timestamp policy
>> b. Convert KafkaRecord to Message object
>> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
>> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Keye. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topicg. Write output to kafka
>>
>> *Detailed Pipeline Information*
>> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
>> Here I am using my own timestamp policy which looks like below:
>>
>> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>>     this.myService = myService;
>>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
>> }
>>
>> @Override
>> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>>     Instant eventTimestamp;
>>     try {
>>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>>     } catch (InvalidProtocolBufferException e) {
>>         statsClient.increment("io.proto.buffer.exception");
>>         throw new RuntimeException(e);
>>     }
>>     this.currentWatermark = eventTimestamp;
>>     return this.currentWatermark;
>> }
>>
>> @Override
>> public Instant getWatermark(PartitionContext ctx) {
>>     return this.currentWatermark;
>> }
>>
>> Event timestamp is one of the fields in the kafka message. It is the time
>> when the event was pushed to kafka.
>>
>> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The Message class contains properties like offset, topic, partition, offset and timestamp
>>
>> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow()
>>
>> d. PCollection<Message> to PCollection<KV<String, Message>>
>> Here Key is the kafka topic.
>>
>> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
>>
>> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
>>
>> g. Write output to kafka
>>
>>

Re: Issue with KafkaIO for list of topics

Posted by rahul patwari <ra...@gmail.com>.
Hi Maulik,

This seems like an issue with Watermark.
According to
https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L240
,

If there are multiple partitions (or) multiple topics, Watermark will be
calculated for each of the partition and the minimum watermark is
considered as the current Watermark.
Assuming that no message is pushed to the topic with 0 throughput,
according to your logic for the watermark calculation, the watermark of
each partition for this topic will be BoundedWindow.TIMESTAMP_MIN_VALUE
(the smallest representable timestamp of an element -
https://github.com/apache/beam/blob/f0930f958d47042948c06041e074ef9f1b0872d9/model/pipeline/src/main/proto/beam_runner_api.proto#L44
).

As the result will be emitted from GroupByKey when the Watermark crosses
the window and as the watermark is BoundedWindow.TIMESTAMP_MIN_VALUE, you
are not seeing the results from GroupByKey.

Regards,
Rahul

On Fri, Feb 28, 2020 at 12:39 PM Maulik Soneji <ma...@gojek.com>
wrote:

> *Observations:*
> If we read using KafkaIO for a list of topics where one of the topics has
> zero throughputs,
> and KafkaIO is followed by GroupByKey stage, then:
> a. No data is output from GroupByKey stage for all the topics and not just
> the zero throughput topic.
>
> If all topics have some throughput coming in, then it works fine and we
> get some output from GroupByKey stage.
>
> Is this an issue?
>
> *Points:*
> a. The output from GroupByKey is only when all topics have some throughput
> b. This is a problem with KafkaIO + GroupByKey, for case where I have
> FileIO + GroupByKey, this issue doesn't arise. GroupByKey outputs some data
> even if there is no data for one of the files.
> c. Not a runner issue, since I ran it with FlinkRunner and DataflowRunner
> d. Even if lag is different for each topic on the list, we still get some
> output from GroupByKey.
>
>
> *Debugging:*While Debugging this issue I found that in split function of
> KafkaUnboundedSource we create KafkaUnboundedSource where partition list is
> one partition for each topic.
>
> I am not sure if this is some issue with watermark, since watermark for
> the topic with no throughput will not advance. But this looks like the most
> likely cause to me.
>
> *Please help me in figuring out whether this is an issue or if there is
> something wrong with my pipeline.*
>
> Attaching detailed pipeline information for more details:
>
> *Context:*
> I am currently using KafkaIO to read data from kafka for a list of topics
> with a custom timestamp policy.
>
> Below is how I am constructing KafkaIO reader:
>
> return KafkaIO.<byte[], byte[]>read()
>         .withBootstrapServers(brokers)
>         .withTopics(topics)
>         .withKeyDeserializer(ByteArrayDeserializer.class)
>         .withValueDeserializer(ByteArrayDeserializer.class)
>         .withTimestampPolicyFactory((partition, previousWatermark) -> new EventTimestampPolicy(godataService, previousWatermark))
>         .commitOffsetsInFinalize();
>
> *Pipeline Information:
> *Pipeline Consists of six steps:
> a. Read From Kafka with custom timestamp policy
> b. Convert KafkaRecord to Message object
> c. Window based on FixedWindow of 10 minutes triggering AfterWatermark
> d. PCollection<Message> to PCollection<KV<String, Message>> where Topic is Keye. GroupByKey.create() to get PCollection<KV<String, Iterable<Message>>f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topicg. Write output to kafka
>
> *Detailed Pipeline Information*
> a. Read data from kafka to get KafkaRecord<byte[], byte[]>
> Here I am using my own timestamp policy which looks like below:
>
> public EventTimestampPolicy(MyService myService, Optional<Instant> previousWatermark) {
>     this.myService = myService;
>     this.currentWatermark = previousWatermark.orElse(BoundedWindow.TIMESTAMP_MIN_VALUE);
> }
>
> @Override
> public Instant getTimestampForRecord(PartitionContext context, KafkaRecord<byte[], byte[]> record) {
>     Instant eventTimestamp;
>     try {
>         eventTimestamp = Deserializer.getEventTimestamp(record, myService);
>     } catch (InvalidProtocolBufferException e) {
>         statsClient.increment("io.proto.buffer.exception");
>         throw new RuntimeException(e);
>     }
>     this.currentWatermark = eventTimestamp;
>     return this.currentWatermark;
> }
>
> @Override
> public Instant getWatermark(PartitionContext ctx) {
>     return this.currentWatermark;
> }
>
> Event timestamp is one of the fields in the kafka message. It is the time
> when the event was pushed to kafka.
>
> b. DoFn to transform KafkaRecord<byte[], byte[]> to Message class.The Message class contains properties like offset, topic, partition, offset and timestamp
>
> c. Windowing on 10 minute fixed window triggering at AfterWatermark.pastEndOfWindow()
>
> d. PCollection<Message> to PCollection<KV<String, Message>>
> Here Key is the kafka topic.
>
> e. GroupByKey to get PCollection<KV<String, Iterable<Message>>
>
> f. PCollection<KV<String, Iterable<Message>> to PCollection<ComputedMetrics> for each topic
>
> g. Write output to kafka
>
>