You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/06/20 13:40:52 UTC

[GitHub] [hudi] veenaypatil commented on a change in pull request #3092: [HUDI-1910] [WIP] Commit Offset to Kafka after successful Hudi commit

veenaypatil commented on a change in pull request #3092:
URL: https://github.com/apache/hudi/pull/3092#discussion_r654937577



##########
File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
##########
@@ -297,7 +293,35 @@ public String getTopicName() {
     return topicName;
   }
 
-  public HashMap<String, Object> getKafkaParams() {
+  public Map<String, Object> getKafkaParams() {
+    return kafkaParams;
+  }
+
+  private static Map<String, Object> excludeHoodieConfigs(TypedProperties props) {
+    Map<String, Object> kafkaParams = new HashMap<>();
+    props.keySet().stream().filter(prop -> {
+      // In order to prevent printing unnecessary warn logs, here filter out the hoodie
+      // configuration items before passing to kafkaParams
+      return !prop.toString().startsWith("hoodie.");
+    }).forEach(prop -> {
+      kafkaParams.put(prop.toString(), props.get(prop.toString()));
+    });
     return kafkaParams;
   }
+
+  /**
+   * Commit offsets to Kafka only after hoodie commit is successful.
+   * @param checkpointStr checkpoint string containing offsets.
+   * @param props properties for Kafka consumer.
+   */
+  public static void commitOffsetToKafka(String checkpointStr, TypedProperties props) {
+    DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(ConsumerConfig.GROUP_ID_CONFIG));
+    Map<TopicPartition, Long> offsetMap = KafkaOffsetGen.CheckpointUtils.strToOffsets(checkpointStr);
+    Map<String, Object> kafkaParams = KafkaOffsetGen.excludeHoodieConfigs(props);
+    Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(offsetMap.size());
+    try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+      offsetMap.forEach((topicPartition, offset) -> offsetAndMetadataMap.put(topicPartition, new OffsetAndMetadata(offset)));
+      consumer.commitSync(offsetAndMetadataMap);

Review comment:
       somehow the commitAsync method is not working with Unit tests




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org