You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by wang Wu <fa...@gmail.com> on 2020/07/29 07:16:18 UTC
KafkaUnboundedReader
Hi,
I am curious about this comment:
if (offset < expected) { // -- (a)
// this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
// should we check if the offset is way off from consumedOffset (say > 1M)?
LOG.warn(
"{}: ignoring already consumed offset {} for {}",
this,
offset,
pState.topicPartition);
continue;
}
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167>
Does it mean that Beam KafkaIO may skip processing some Kafka messages if the lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?
Context: We enable at-least-once delivery semantics on our Beam code by this code:
input
.getPipeline()
.apply(
"ReadFromKafka",
KafkaIO.readBytes()
.withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
.withTopics(getTopics())
.withConsumerConfigUpdates(
ImmutableMap.of(
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
ConsumerConfig.GROUP_ID_CONFIG, groupId
))
.withReadCommitted()
.commitOffsetsInFinalize())
However, we notice that if we send > 1 millions Kafka message and the batch processing can not keep up, it seems that Beam process less number of messages than we sent.
Regards
Dinh
Re: KafkaUnboundedReader
Posted by Maximilian Michels <mx...@apache.org>.
Hi Dinh,
The check only de-duplicates in case the consumer processes the same
offset multiple times. It ensures the offset is always increasing.
If this has been fixed in Kafka, which the comment assumes, the
condition will never be true.
Which Kafka version are you using?
-Max
On 29.07.20 09:16, wang Wu wrote:
> Hi,
> I am curious about this comment:
>
> if (offset < expected) { // -- (a)
> // this can happen when compression is enabled in Kafka (seems to be
> fixed in 0.10)
> // should we check if the offset is way off from consumedOffset (say > 1M)?
> LOG.warn(
> "{}: ignoring already consumed offset {} for {}",
> this,
> offset,
> pState.topicPartition);
> continue;
> }
>
>
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167
>
> Does it mean that Beam KafkaIO may skip processing some Kafka messages
> if the lag in consuming Kafka messages > 1 M?
> Why Kafka compression may result in this bug?
> Is there anyway to prevent loss messages and enable at-least-once delivery?
>
> Context: We enable at-least-once delivery semantics on our Beam code by
> this code:
>
> input
> .getPipeline()
> .apply(
> "ReadFromKafka",
> KafkaIO.readBytes()
> .withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
> .withTopics(getTopics())
> .withConsumerConfigUpdates(
> ImmutableMap.of(
> ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false,
> ConsumerConfig.GROUP_ID_CONFIG,groupId
> ))
> .withReadCommitted()
> .commitOffsetsInFinalize())
>
> However, we notice that if we send > 1 millions Kafka message and the
> batch processing can not keep up, it seems that Beam process less number
> of messages than we sent.
>
> Regards
> Dinh