You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/03 15:26:23 UTC

[GitHub] [beam] damccorm opened a new issue, #17935: KafkaIO does not commit offsets to Kafka

damccorm opened a new issue, #17935:
URL: https://github.com/apache/beam/issues/17935

   I use KafkaIO as a source, and I would like consumed offsets to be stored in Kafka (in the `__consumer_offsets` topic).
   
   I'm configuring the Kafka reader with 
   ```
   
   .updateConsumerProperties(ImmutableMap.of(
                 ConsumerConfig.GROUP_ID_CONFIG, "my-group",
   
                ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, java.lang.Boolean.TRUE,
                 ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
   "10" // doesn't work with default value either (5000ms)
               ))
   
   ```
   
   
   But the offsets are not stored in Kafka (nothing in `__consumer_offsets`, next job will restart at latest offset).
   
   I can't find in the code where the offsets are supposed to be committed.
   
   I tried to add a manual commit in the `consumerPollLoop()` method, and it works, offsets are committed:
   
   ```
   
   private void consumerPollLoop() {
               // Read in a loop and enqueue the batch of records, if
   any, to availableRecordsQueue
               while (!closed.get()) {
                   try {
              
           ConsumerRecords<byte[], byte[]> records = consumer.poll(KAFKA_POLL_TIMEOUT.getMillis());
     
                    if (!records.isEmpty() && !closed.get()) {
                           availableRecordsQueue.put(records);
   // blocks until dequeued.
                           // Manual commit
                           consumer.commitSync();
   
                      }
                   } catch (InterruptedException e) {
                       LOG.warn("{}:
   consumer thread is interrupted", this, e); // not expected
                       break;
                
     } catch (WakeupException e) {
                       break;
                   }
               }
   
          
       LOG.info("{}: Returning from consumer pool loop", this);
           }
   
   ```
   
   
   Is this a bug in KafkaIO or am I misconfiguring something?
   
   Disclamer: I'm currently using KafkaIO in Dataflow, using the backport in Dataflow SDK (https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java), but I'm confident the code is similar for this case.
   
   Edit: I found the correct method where KafkaIO is supposed to commit at the end of a batch. I'm currently testing it and will be able to open a pull request soon:
   
   ```
   
   // KafkaCheckpointMark.java
   
       /**
        * Optional consumer that will be used to commit offsets into
   Kafka when finalizeCheckpoint() is called
        */
       @Nullable
       private final Consumer consumer;
   
   
      public KafkaCheckpointMark(List<PartitionMark> partitions, @Nullable Consumer consumer) {
         
    this.partitions = partitions;
           this.consumer = consumer;
       }
   
       /**
        * Commit synchronously
   into Kafka offsets that have been passed downstream.
        */
       @Override
       public void finalizeCheckpoint()
   throws IOException {
           if (consumer == null) {
               LOG.warn("finalizeCheckpoint(): no
   consumer provided, will not commit anything.");
               return;
           }
           if (partitions.size()
   == 0) {
               LOG.info("finalizeCheckpoint(): nothing to commit to Kafka.");
               return;
   
          }
   
           final Map<TopicPartition, OffsetAndMetadata> offsets = newHashMap();
           String
   committedOffsets = "";
           for (PartitionMark partition : partitions) {
               TopicPartition
   topicPartition = partition.getTopicPartition();
               offsets.put(topicPartition, new OffsetAndMetadata(partition.offset));
   
              committedOffsets += topicPartition.topic() + "-" + topicPartition.partition() + ":" + partition.offset
   + ",";
           }
   
           final String printableOffsets = committedOffsets.substring(0, committedOffsets.length()
   - 1);
           try {
               consumer.commitSync(offsets);
               LOG.info("finalizeCheckpoint():
   committed Kafka offsets {}", printableOffsets);
           } catch (Exception e) {
               LOG.error("finalizeCheckpoint():
   {} when trying to commit Kafka offsets [{}]",
                       e.getClass().getSimpleName(),
      
                   printableOffsets);
           }
       }
   
   ```
   
   
   Imported from Jira [BEAM-990](https://issues.apache.org/jira/browse/BEAM-990). Original Jira may contain additional context.
   Reported by: alban@perillat.org.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] damccorm closed issue #17935: KafkaIO does not commit offsets to Kafka

Posted by GitBox <gi...@apache.org>.
damccorm closed issue #17935: KafkaIO does not commit offsets to Kafka
URL: https://github.com/apache/beam/issues/17935


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org