You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2023/07/09 09:32:24 UTC

[pinot] branch master updated: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java (#253) (#11040)

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 10a18d55b9 retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java (#253) (#11040)
10a18d55b9 is described below

commit 10a18d55b9d27851fa2bba1f77eda3e9d600aa37
Author: Johan Adami <47...@users.noreply.github.com>
AuthorDate: Sun Jul 9 05:32:18 2023 -0400

    retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java (#253) (#11040)
    
    Co-authored-by: Johan Adami <ja...@stripe.com>
---
 .../KafkaPartitionLevelConnectionHandler.java      | 32 +++++++++++++++++++++-
 1 file changed, 31 insertions(+), 1 deletion(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
index ca6290d59a..00371acaba 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java
@@ -19,17 +19,22 @@
 package org.apache.pinot.plugin.stream.kafka20;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.Uninterruptibles;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -39,6 +44,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
  */
 public abstract class KafkaPartitionLevelConnectionHandler {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConnectionHandler.class);
   protected final KafkaPartitionLevelStreamConfig _config;
   protected final String _clientId;
   protected final int _partition;
@@ -61,12 +67,36 @@ public abstract class KafkaPartitionLevelConnectionHandler {
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+      }
+    }
+  }
+
   public void close()
       throws IOException {
     _consumer.close();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org