You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@beam.apache.org by wang Wu <fa...@gmail.com> on 2020/07/29 07:16:18 UTC

KafkaUnboundedReader

Hi,
I am curious about this comment:

if (offset < expected) { // -- (a)
          // this can happen when compression is enabled in Kafka (seems to be fixed in 0.10)
          // should we check if the offset is way off from consumedOffset (say > 1M)?
          LOG.warn(
              "{}: ignoring already consumed offset {} for {}",
              this,
              offset,
              pState.topicPartition);
          continue;
        }

https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167 <https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167>

Does it mean that Beam KafkaIO may skip processing some Kafka messages if the lag in consuming Kafka messages > 1 M?
Why Kafka compression may result in this bug?
Is there anyway to prevent loss messages and enable at-least-once delivery?

Context: We enable at-least-once delivery semantics on our Beam code by this code:

input
    .getPipeline()
    .apply(
        "ReadFromKafka",
        KafkaIO.readBytes()
            .withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
            .withTopics(getTopics())
            .withConsumerConfigUpdates(
                ImmutableMap.of(
                    ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false,
                    ConsumerConfig.GROUP_ID_CONFIG, groupId
                    ))
            .withReadCommitted()
            .commitOffsetsInFinalize())
However, we notice that if we send > 1 millions Kafka message and the batch processing can not keep up, it seems that Beam process less number of messages than we sent.

Regards
Dinh

Re: KafkaUnboundedReader

Posted by Maximilian Michels <mx...@apache.org>.
Hi Dinh,

The check only de-duplicates in case the consumer processes the same 
offset multiple times. It ensures the offset is always increasing.

If this has been fixed in Kafka, which the comment assumes, the 
condition will never be true.

Which Kafka version are you using?

-Max

On 29.07.20 09:16, wang Wu wrote:
> Hi,
> I am curious about this comment:
> 
> if (offset < expected) { // -- (a)
> 	// this can happen when compression is enabled in Kafka (seems to be 
> fixed in 0.10)
> 	// should we check if the offset is way off from consumedOffset (say > 1M)?
> 	LOG.warn(
> 	"{}: ignoring already consumed offset {} for {}",
> 	this,
> 	offset,
> 	pState.topicPartition);
> 	continue;
> 	}
> 
> 
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L167
> 
> Does it mean that Beam KafkaIO may skip processing some Kafka messages 
> if the lag in consuming Kafka messages > 1 M?
> Why Kafka compression may result in this bug?
> Is there anyway to prevent loss messages and enable at-least-once delivery?
> 
> Context: We enable at-least-once delivery semantics on our Beam code by 
> this code:
> 
> input
>      .getPipeline()
>      .apply(
>          "ReadFromKafka",
>          KafkaIO.readBytes()
>              .withBootstrapServers(getSource().getKafkaSourceConfig().getBootstrapServers())
>              .withTopics(getTopics())
>              .withConsumerConfigUpdates(
>                  ImmutableMap.of(
>                      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false,
>                      ConsumerConfig.GROUP_ID_CONFIG,groupId
> ))
>              .withReadCommitted()
>              .commitOffsetsInFinalize())
> 
> However, we notice that if we send > 1 millions Kafka message and the 
> batch processing can not keep up, it seems that Beam process less number 
> of messages than we sent.
> 
> Regards
> Dinh