You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@beam.apache.org by Ahmet Altay <al...@google.com> on 2021/02/01 20:22:45 UTC

Re: KafkaIO withStartReadTime hard failure

+Boyuan Zhang <bo...@google.com> - in case she knows the answer.

On Wed, Jan 27, 2021 at 9:30 AM Alexey Romanenko <ar...@gmail.com>
wrote:

> + Raghu Angadi
>
> Raghu, I guess you did a review of the PR with this feature.
> Could you shed some light on this, please?
>
> Alexey
>
> On 25 Jan 2021, at 13:59, Dan Mason <Da...@king.com> wrote:
>
> Hi Beam Community,
>
> While preparing a few dataflow streaming pipelines, with Kafka as a
> source, I have come across a bit of an issue. Some of the topics I am
> reading from have very low throughput, but I hope to utilise the
> withStartReadTime option to help control the offset at start up.
>
> The issue I am facing is related to the hard failure which arises when
> there is no data present to consume after setting the withStartReadTime
> option as documented here [1]. Draining is blocked while this hard error is
> occurring this gives false alerts in our monitoring to detect failing jobs.
> The use of multiple topics is also problematic as the job will not read
> from any topic as long as any one is producing this error.
>
> I would like to understand why has this been made such a hard error when
> it feels a situation pipelines can easily be in, and would there be any
> possibility of reducing it to a softer error allowing features such as
> draining and multiple topics on these jobs.
>
> Thanks for any help understanding this issue,
>
> Dan
>
> [1]
> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withStartReadTime-org.joda.time.Instant-
>
>
>

Re: KafkaIO withStartReadTime hard failure

Posted by Boyuan Zhang <bo...@google.com>.
I happened to look into this before.

The hard failure comes from ConsumerSpEL.offsetForTime[1]. The function
throws RuntimeError when there is no offset for given start time[2].  The
reason why it's a hard failure for KafkaIO(UnboundedSource) is that the
KafkaIO needs to figure out an offset by the given start offset and has the
consumer located at that offset before reading. If there is no such offset,
the KafkaIO is not able to start the reading, which should fail immediately.

But it should not be a hard failure when using ReadFromKafkaDoFn[3] because
ReadFromKafka is able to perform self-checkpoint when there are no
available records. I filed https://issues.apache.org/jira/browse/BEAM-11734
for tracking this improvement.

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
[2]
https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-





On Mon, Feb 1, 2021 at 12:22 PM Ahmet Altay <al...@google.com> wrote:

> +Boyuan Zhang <bo...@google.com> - in case she knows the answer.
>
> On Wed, Jan 27, 2021 at 9:30 AM Alexey Romanenko <ar...@gmail.com>
> wrote:
>
>> + Raghu Angadi
>>
>> Raghu, I guess you did a review of the PR with this feature.
>> Could you shed some light on this, please?
>>
>> Alexey
>>
>> On 25 Jan 2021, at 13:59, Dan Mason <Da...@king.com> wrote:
>>
>> Hi Beam Community,
>>
>> While preparing a few dataflow streaming pipelines, with Kafka as a
>> source, I have come across a bit of an issue. Some of the topics I am
>> reading from have very low throughput, but I hope to utilise the
>> withStartReadTime option to help control the offset at start up.
>>
>> The issue I am facing is related to the hard failure which arises when
>> there is no data present to consume after setting the withStartReadTime
>> option as documented here [1]. Draining is blocked while this hard error is
>> occurring this gives false alerts in our monitoring to detect failing jobs.
>> The use of multiple topics is also problematic as the job will not read
>> from any topic as long as any one is producing this error.
>>
>> I would like to understand why has this been made such a hard error when
>> it feels a situation pipelines can easily be in, and would there be any
>> possibility of reducing it to a softer error allowing features such as
>> draining and multiple topics on these jobs.
>>
>> Thanks for any help understanding this issue,
>>
>> Dan
>>
>> [1]
>> https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withStartReadTime-org.joda.time.Instant-
>>
>>
>>