You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "jadami10 (via GitHub)" <gi...@apache.org> on 2023/07/07 23:24:51 UTC

[GitHub] [pinot] jadami10 commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

jadami10 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1256601326


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   @navina, thoughts since you're working a lot on ingestion? I opted for constant backoff to not introduce too much lag into ingestion, but could be convinced otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org