You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by rahul patwari <ra...@gmail.com> on 2019/01/04 07:22:01 UTC

Kafka LogAppendTimePolicy not throwing Exception when log.message.timestamp.type = CreateTime for the Kafka topic

Hi,

We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
even when log.message.timestamp.type = CreateTime.

This happens due to the statement:
else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
getTimestampForRecord() method in TimestampPolicyFactory Interface.

As the topic is idle at the beginning of the pipeline, the currentWatermark
is advanced (backlog==0), because of which
currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and the
timestamp of the records are taken as currentWatermark.

If we change else if() to else, IllegalStateException is thrown when the
first record from the Kafka topic is read, which is expected.
Is there any specific reason behind using else if() instead of else?

Thanks and Regards,
Rahul

Re: Kafka LogAppendTimePolicy not throwing Exception when log.message.timestamp.type = CreateTime for the Kafka topic

Posted by Raghu Angadi <ra...@google.com>.
On Fri, Jan 4, 2019 at 12:54 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi Raghu,
> Thanks for the response.
>
> I used withTopics() and withCreateTime() to read records from multiple
> topics, "topic1" with message.timestamp.type=CreateTime and "topic2"
> with message.timestamp.type=LogAppendTime.
> And I got the Exception: java.lang.IllegalArgumentException: Kafka
> record's timestamp is not 'CREATE_TIME' (topic: topic2, partition 0, offset
> 0, timestamp type 'LogAppendTime').
>

If you are reading 'withCreateTime()', all the records from all the topics
are expected to have CREATE_TIME timestamp. I don't think it makes much
logical sense for built in timestamp factories to support mixed special
cases like this. Couple of options for you depending on your exact use case:

   - Read topic1 and topic2 as two separate PCollections (i.e. two
   instances of KafkaIO.read()).
   - Or implement a timestamp factory that handles the mixed case like you
   prefer (as you described below). It is pretty simple to implement them. You
   can start with one of the built in factories.


> So, my understanding is that, only when withLogAppendTime() is used, we
> don't want users to get stuck if some topics have multiple timestamp types.
> In this case, where two topics have two different timestamp types, the
> watermark will be calculated only based on the records which belong to the
> topic with timestamp as LogAppendTime. I am thinking that we can calculate
> the watermark more accurately if we also consider the records in the topic
> with timestamp type CreateTime. So, instead of directly returning
> currentWatermark, we can update currentWatermark with the record's
> timestamp(either CreateTime (or) LogAppendTime) always and then return
> currentWatermark.
>
> I would like to contribute if a fix is needed.
>
> Regards,
> Rahul
>
> On Sat, Jan 5, 2019 at 12:40 AM Raghu Angadi <ra...@google.com> wrote:
>
>> The intention was to assert on 'timestamp_type' on the first record only.
>> I was not entirely sure if there are situations in Kafka where a timestamp
>> type could be different or timestamp itself could be missing for some
>> records. The assertion on the first record was just to sanity check common
>> misconfiguration. The way this policy checked for first record itself is
>> incorrect in the case of idle partitions since the watermark advances even
>> with out any records read.. this is the issue you encountered. When the
>> timestamp type does not match, it's timestamp is not used to watermark.
>>
>> As you suggested, simpler fix might just be require every record's
>> timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else').
>> Is that safe? We don't want users to get stuck if some topics are expected
>> to have multiple timestamp types.
>>
>> Raghu.
>> On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <ra...@gmail.com>
>> wrote:
>>
>>> Hi,
>>>
>>> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
>>> idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
>>> even when log.message.timestamp.type = CreateTime.
>>>
>>> This happens due to the statement:
>>> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
>>> getTimestampForRecord() method in TimestampPolicyFactory Interface.
>>>
>>> As the topic is idle at the beginning of the pipeline, the
>>> currentWatermark is advanced (backlog==0), because of which
>>> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and
>>> the timestamp of the records are taken as currentWatermark.
>>>
>>> If we change else if() to else, IllegalStateException is thrown when the
>>> first record from the Kafka topic is read, which is expected.
>>> Is there any specific reason behind using else if() instead of else?
>>>
>>> Thanks and Regards,
>>> Rahul
>>>
>>>

Re: Kafka LogAppendTimePolicy not throwing Exception when log.message.timestamp.type = CreateTime for the Kafka topic

Posted by rahul patwari <ra...@gmail.com>.
Hi Raghu,
Thanks for the response.

I used withTopics() and withCreateTime() to read records from multiple
topics, "topic1" with message.timestamp.type=CreateTime and "topic2"
with message.timestamp.type=LogAppendTime.
And I got the Exception: java.lang.IllegalArgumentException: Kafka record's
timestamp is not 'CREATE_TIME' (topic: topic2, partition 0, offset 0,
timestamp type 'LogAppendTime').

So, my understanding is that, only when withLogAppendTime() is used, we
don't want users to get stuck if some topics have multiple timestamp types.
In this case, where two topics have two different timestamp types, the
watermark will be calculated only based on the records which belong to the
topic with timestamp as LogAppendTime. I am thinking that we can calculate
the watermark more accurately if we also consider the records in the topic
with timestamp type CreateTime. So, instead of directly returning
currentWatermark, we can update currentWatermark with the record's
timestamp(either CreateTime (or) LogAppendTime) always and then return
currentWatermark.

I would like to contribute if a fix is needed.

Regards,
Rahul

On Sat, Jan 5, 2019 at 12:40 AM Raghu Angadi <ra...@google.com> wrote:

> The intention was to assert on 'timestamp_type' on the first record only.
> I was not entirely sure if there are situations in Kafka where a timestamp
> type could be different or timestamp itself could be missing for some
> records. The assertion on the first record was just to sanity check common
> misconfiguration. The way this policy checked for first record itself is
> incorrect in the case of idle partitions since the watermark advances even
> with out any records read.. this is the issue you encountered. When the
> timestamp type does not match, it's timestamp is not used to watermark.
>
> As you suggested, simpler fix might just be require every record's
> timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else').
> Is that safe? We don't want users to get stuck if some topics are expected
> to have multiple timestamp types.
>
> Raghu.
> On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <ra...@gmail.com>
> wrote:
>
>> Hi,
>>
>> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
>> idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
>> even when log.message.timestamp.type = CreateTime.
>>
>> This happens due to the statement:
>> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
>> getTimestampForRecord() method in TimestampPolicyFactory Interface.
>>
>> As the topic is idle at the beginning of the pipeline, the
>> currentWatermark is advanced (backlog==0), because of which
>> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and
>> the timestamp of the records are taken as currentWatermark.
>>
>> If we change else if() to else, IllegalStateException is thrown when the
>> first record from the Kafka topic is read, which is expected.
>> Is there any specific reason behind using else if() instead of else?
>>
>> Thanks and Regards,
>> Rahul
>>
>>

Re: Kafka LogAppendTimePolicy not throwing Exception when log.message.timestamp.type = CreateTime for the Kafka topic

Posted by Raghu Angadi <ra...@google.com>.
The intention was to assert on 'timestamp_type' on the first record only. I
was not entirely sure if there are situations in Kafka where a timestamp
type could be different or timestamp itself could be missing for some
records. The assertion on the first record was just to sanity check common
misconfiguration. The way this policy checked for first record itself is
incorrect in the case of idle partitions since the watermark advances even
with out any records read.. this is the issue you encountered. When the
timestamp type does not match, it's timestamp is not used to watermark.

As you suggested, simpler fix might just be require every record's
timestamp_type to be LOG_APPEND_TIME (i.e. replace 'else if' with 'else').
Is that safe? We don't want users to get stuck if some topics are expected
to have multiple timestamp types.

Raghu.
On Thu, Jan 3, 2019 at 11:22 PM rahul patwari <ra...@gmail.com>
wrote:

> Hi,
>
> We are using KafkaIO.read() with LogAppendTimePolicy. When the topic is
> idle at the beginning of the pipeline, IllegalStateException is  NOT thrown
> even when log.message.timestamp.type = CreateTime.
>
> This happens due to the statement:
> else if (currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) in
> getTimestampForRecord() method in TimestampPolicyFactory Interface.
>
> As the topic is idle at the beginning of the pipeline, the
> currentWatermark is advanced (backlog==0), because of which
> currentWatermark.equals(BoundedWindow.TIMESTAMP_MIN_VALUE) is False and
> the timestamp of the records are taken as currentWatermark.
>
> If we change else if() to else, IllegalStateException is thrown when the
> first record from the Kafka topic is read, which is expected.
> Is there any specific reason behind using else if() instead of else?
>
> Thanks and Regards,
> Rahul
>
>