You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 15:26:23 UTC
[GitHub] [beam] damccorm opened a new issue, #17935: KafkaIO does not commit offsets to Kafka
damccorm opened a new issue, #17935:
URL: https://github.com/apache/beam/issues/17935
I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the `__consumer_offsets` topic).
I'm configuring the Kafka reader with
```
.updateConsumerProperties(ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG, "my-group",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
"10" // doesn't work with default value either (5000ms)
))
```
But the offsets are not stored in Kafka (nothing in `__consumer_offsets`, next job will restart at latest offset).
I can't find in the code where the offsets are supposed to be committed.
I tried to add a manual commit in the `consumerPollLoop()` method, and it works, offsets are committed:
```
private void consumerPollLoop() {
// Read in a loop and enqueue the batch of records, if
any, to availableRecordsQueue
while (!closed.get()) {
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
if (!records.isEmpty() && !closed.get()) {
availableRecordsQueue.put(records);
// blocks until dequeued.
// Manual commit
consumer.commitSync();
}
} catch (InterruptedException e) {
LOG.warn("{}:
consumer thread is interrupted", this, e); // not expected
break;
} catch (WakeupException e) {
break;
}
}
LOG.info("{}: Returning from consumer pool loop", this);
}
```
Is this a bug in KafkaIO or am I misconfiguring something?
Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case.
Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch. I'm currently testing it and will be able to open a pull request soon:
```
// KafkaCheckpointMark.java
/**
* Optional consumer that will be used to commit offsets into
Kafka when finalizeCheckpoint() is called
*/
@Nullable
private final Consumer consumer;
public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer) {
this.partitions = partitions;
this.consumer = consumer;
}
/**
* Commit synchronously
into Kafka offsets that have been passed downstream.
*/
@Override
public void finalizeCheckpoint()
throws IOException {
if (consumer == null) {
LOG.warn("finalizeCheckpoint(): no
consumer provided, will not commit anything.");
return;
}
if (partitions.size()
== 0) {
LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
return;
}
final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
String
committedOffsets = "";
for (PartitionMark partition : partitions) {
TopicPartition
topicPartition = partition.getTopicPartition();
offsets.put(topicPartition, new OffsetAndMetadata(partition.offset));
committedOffsets += topicPartition.topic() + "-" + topicPartition.partition() + ":" + partition.offset
+ ",";
}
final String printableOffsets = committedOffsets.substring(0, committedOffsets.length()
- 1);
try {
consumer.commitSync(offsets);
LOG.info("finalizeCheckpoint():
committed Kafka offsets {}", printableOffsets);
} catch (Exception e) {
LOG.error("finalizeCheckpoint():
{} when trying to commit Kafka offsets [{}]",
e.getClass().getSimpleName(),
printableOffsets);
}
}
```
Imported from Jira [BEAM-990](https://issues.apache.org/jira/browse/BEAM-990). Original Jira may contain additional context.
Reported by: alban@perillat.org.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
[GitHub] [beam] damccorm closed issue #17935: KafkaIO does not commit offsets to Kafka
Posted by GitBox <gi...@apache.org>.
damccorm closed issue #17935: KafkaIO does not commit offsets to Kafka
URL: https://github.com/apache/beam/issues/17935
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: github-unsubscribe@beam.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org