You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by "Alban Perillat-Merceroz (JIRA)" <ji...@apache.org> on 2016/11/16 09:58:58 UTC
[jira] [Created] (BEAM-990) KafkaIO does not commit offsets to
Kafka
Alban Perillat-Merceroz created BEAM-990:
--------------------------------------------
Summary: KafkaIO does not commit offsets to Kafka
Key: BEAM-990
URL: https://issues.apache.org/jira/browse/BEAM-990
Project: Beam
Issue Type: Bug
Reporter: Alban Perillat-Merceroz
I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the {{__consumer_offsets}} topic).
I'm configuring the Kafka reader with
{code:java}
.updateConsumerProperties(ImmutableMap.of(
ConsumerConfig.GROUP_ID_CONFIG, "my-group",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10" // doesn't work with default value either (5000ms)
))
{code}
But the offsets are not stored in Kafka (nothing in {{__consumer_offsets}}, next job will restart at latest offset).
I can't find in the code where the offsets are supposed to be committed.
I tried to add a manual commit in the {{consumerPollLoop()}} method, and it works, offsets are committed:
{code:java}
private void consumerPollLoop() {
// Read in a loop and enqueue the batch of records, if any, to availableRecordsQueue
while (!closed.get()) {
try {
ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
if (!records.isEmpty() && !closed.get()) {
availableRecordsQueue.put(records); // blocks until dequeued.
// Manual commit
consumer.commitSync();
}
} catch (InterruptedException e) {
LOG.warn("{}: consumer thread is interrupted", this, e); // not expected
break;
} catch (WakeupException e) {
break;
}
}
LOG.info("{}: Returning from consumer pool loop", this);
}
{code}
Is this a bug in KafkaIO or am I misconfiguring something?
Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)