You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by "jadami10 (via GitHub)" <gi...@apache.org> on 2023/07/06 14:46:49 UTC

[GitHub] [pinot] jadami10 opened a new pull request, #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

jadami10 opened a new pull request, #11040:
URL: https://github.com/apache/pinot/pull/11040

   This is an issue we saw internally. A DNS lookup failure would cause the KafkaConsumer to fail creation which then caused the segment to go into error state. There's a long chain of calls all the way to `org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager#makeStreamConsumer` where we could add retries, but at that level, it's not clear you're necessarily using kafka. So we added the retry at the boundary from pinot<>kafka code.
   
   We've been running this internally for ~2 weeks, and it continues to catch and retrty errors without having error segments.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 merged pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 merged PR #11040:
URL: https://github.com/apache/pinot/pull/11040


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 commented on PR #11040:
URL: https://github.com/apache/pinot/pull/11040#issuecomment-1627660377

   Thanks for catching and fixing this issue!
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] navina commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "navina (via GitHub)" <gi...@apache.org>.
navina commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1257083702


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   @jadami10 Since this is used in the instantiation of the segment data manager, if we use exponential backoff, it might actually block the helix thread that is handling the state transition (offline -> consuming) , which is usually where the table and segment data manager get created. so, I would recommend just using a backoff with a max upper-bound. The drawback with constant backoff is the possibility of thrashing the source system. But in this case, I don't think it will be an issue. 
   
   btw, can you share the exception/stacktrace you see in the absence of this retry loop? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] codecov-commenter commented on pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "codecov-commenter (via GitHub)" <gi...@apache.org>.
codecov-commenter commented on PR #11040:
URL: https://github.com/apache/pinot/pull/11040#issuecomment-1623906648

   ## [Codecov](https://app.codecov.io/gh/apache/pinot/pull/11040?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report
   > Merging [#11040](https://app.codecov.io/gh/apache/pinot/pull/11040?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (23a0ddb) into [master](https://app.codecov.io/gh/apache/pinot/commit/878f61357b6868d98a3eb8c84ec1dfb414c44b6f?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) (878f613) will **decrease** coverage by `0.12%`.
   > The diff coverage is `0.00%`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #11040      +/-   ##
   ==========================================
   - Coverage    0.11%    0.00%   -0.12%     
   ==========================================
     Files        2200     2184      -16     
     Lines      118822   118401     -421     
     Branches    18002    17948      -54     
   ==========================================
   - Hits          137        0     -137     
   + Misses     118665   118401     -264     
   + Partials       20        0      -20     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | integration1temurin11 | `0.00% <0.00%> (ø)` | |
   | integration1temurin17 | `?` | |
   | integration1temurin20 | `0.00% <0.00%> (ø)` | |
   | integration2temurin11 | `?` | |
   | integration2temurin17 | `?` | |
   | integration2temurin20 | `0.00% <0.00%> (ø)` | |
   | unittests1temurin11 | `0.00% <ø> (ø)` | |
   | unittests1temurin17 | `0.00% <ø> (ø)` | |
   | unittests1temurin20 | `0.00% <ø> (ø)` | |
   | unittests2temurin11 | `?` | |
   | unittests2temurin17 | `?` | |
   | unittests2temurin20 | `?` | |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://app.codecov.io/gh/apache/pinot/pull/11040?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) | Coverage Δ | |
   |---|---|---|
   | [.../kafka20/KafkaPartitionLevelConnectionHandler.java](https://app.codecov.io/gh/apache/pinot/pull/11040?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache#diff-cGlub3QtcGx1Z2lucy9waW5vdC1zdHJlYW0taW5nZXN0aW9uL3Bpbm90LWthZmthLTIuMC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcGlub3QvcGx1Z2luL3N0cmVhbS9rYWZrYTIwL0thZmthUGFydGl0aW9uTGV2ZWxDb25uZWN0aW9uSGFuZGxlci5qYXZh) | `0.00% <0.00%> (ø)` | |
   
   ... and [16 files with indirect coverage changes](https://app.codecov.io/gh/apache/pinot/pull/11040/indirect-changes?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1257448004


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   Agreed that this might cause zk/helix timeout eventually. I think 2 sec retry should be fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] xiangfu0 commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "xiangfu0 (via GitHub)" <gi...@apache.org>.
xiangfu0 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1256556454


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   Shall we do an exponential time backoff ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] jadami10 commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1258265019


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   here's the stack trace. we're still seeing it daily, but it resolves with 1-2 retries. we're separately investigating why dns resolution suddenly started failing.
   
   ```
   [2023-07-10 13:14:13.087928] WARN [ClientUtils] [HelixTaskExecutor-message_handle_thread_16:25] Couldn't resolve server <kafka broker address> from bootstrap.servers as DNS resolution failed for <kafka broker address>
   [2023-07-10 13:14:13.088247] WARN [KafkaPartitionLevelConnectionHandler] [HelixTaskExecutor-message_handle_thread_16:25] Caught exception while creating Kafka consumer, retrying 1/5
   [2023-07-10 13:14:13.088268] org.apache.pinot.shaded.org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
   [2023-07-10 13:14:13.088284] 	at org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088303] 	at org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:665) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088322] 	at org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:646) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088337] 	at org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:626) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088354] 	at org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.createConsumer(KafkaPartitionLevelConnectionHandler.java:84) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088371] 	at org.apache.pinot.plugin.stream.kafka20.KafkaPartitionLevelConnectionHandler.<init>(KafkaPartitionLevelConnectionHandler.java:70) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088393] 	at org.apache.pinot.plugin.stream.kafka20.KafkaStreamMetadataProvider.<init>(KafkaStreamMetadataProvider.java:54) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088411] 	at org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory.createPartitionMetadataProvider(KafkaConsumerFactory.java:43) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088426] 	at org.apache.pinot.spi.stream.StreamMetadataProvider.computePartitionGroupMetadata(StreamMetadataProvider.java:83) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088442] 	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.setPartitionParameters(LLRealtimeSegmentDataManager.java:1532) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088476] 	at org.apache.pinot.core.data.manager.realtime.LLRealtimeSegmentDataManager.<init>(LLRealtimeSegmentDataManager.java:1442) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088493] 	at org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager.addSegment(RealtimeTableDataManager.java:445) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088508] 	at org.apache.pinot.server.starter.helix.HelixInstanceDataManager.addRealtimeSegment(HelixInstanceDataManager.java:219) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088528] 	at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline(SegmentOnlineOfflineStateModelFactory.java:168) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088553] 	at org.apache.pinot.server.starter.helix.SegmentOnlineOfflineStateModelFactory$SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline(SegmentOnlineOfflineStateModelFactory.java:83) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088561] 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:?]
   [2023-07-10 13:14:13.088570] 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:?]
   [2023-07-10 13:14:13.088579] 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:?]
   [2023-07-10 13:14:13.088585] 	at java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
   [2023-07-10 13:14:13.088602] 	at org.apache.helix.messaging.handling.HelixStateTransitionHandler.invoke(HelixStateTransitionHandler.java:350) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088620] 	at org.apache.helix.messaging.handling.HelixStateTransitionHandler.handleMessage(HelixStateTransitionHandler.java:278) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088636] 	at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:97) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088650] 	at org.apache.helix.messaging.handling.HelixTask.call(HelixTask.java:49) ~[pinot-all-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-jar-with-dependencies.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088657] 	at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?]
   [2023-07-10 13:14:13.088666] 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[?:?]
   [2023-07-10 13:14:13.088675] 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[?:?]
   [2023-07-10 13:14:13.088681] 	at java.lang.Thread.run(Thread.java:829) [?:?]
   [2023-07-10 13:14:13.088691] Caused by: org.apache.pinot.shaded.org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
   [2023-07-10 13:14:13.088710] 	at org.apache.pinot.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:89) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088724] 	at org.apache.pinot.shaded.org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088738] 	at org.apache.pinot.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:731) ~[pinot-kafka-2.0-0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-shaded.jar:0.13.0-2023-06-13-9ac8b03432-SNAPSHOT-9ac8b0343239a62a1fa450ae2d7d546da6b225da]
   [2023-07-10 13:14:13.088743] 	... 26 more
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[GitHub] [pinot] jadami10 commented on a diff in pull request #11040: retry KafkaConsumer creation in KafkaPartitionLevelConnectionHandler.java

Posted by "jadami10 (via GitHub)" <gi...@apache.org>.
jadami10 commented on code in PR #11040:
URL: https://github.com/apache/pinot/pull/11040#discussion_r1256601326


##########
pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConnectionHandler.java:
##########
@@ -61,12 +67,36 @@ public KafkaPartitionLevelConnectionHandler(String clientId, StreamConfig stream
       consumerProp.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, _config.getKafkaIsolationLevel());
     }
     consumerProp.put(ConsumerConfig.CLIENT_ID_CONFIG, _clientId);
-    _consumer = new KafkaConsumer<>(consumerProp);
+    _consumer = createConsumer(consumerProp);
     _topicPartition = new TopicPartition(_topic, _partition);
     _consumer.assign(Collections.singletonList(_topicPartition));
     _kafkaMetadataExtractor = KafkaMetadataExtractor.build(_config.isPopulateMetadata());
   }
 
+  private Consumer<String, Bytes> createConsumer(Properties consumerProp) {
+    // Creation of the KafkaConsumer can fail for multiple reasons including DNS issues.
+    // We arbitrarily chose 5 retries with 2 seconds sleep in between retries. 10 seconds total felt
+    // like a good balance of not waiting too long for a retry, but also not retrying too many times.
+    int maxTries = 5;
+    int tries = 0;
+    while (true) {
+      try {
+        return new KafkaConsumer<>(consumerProp);
+      } catch (KafkaException e) {
+        tries++;
+        if (tries >= maxTries) {
+          LOGGER.error("Caught exception while creating Kafka consumer, giving up", e);
+          throw e;
+        }
+        LOGGER.warn("Caught exception while creating Kafka consumer, retrying {}/{}", tries, maxTries, e);
+        // We are choosing to sleepUniterruptibly here because other parts of the Kafka consumer code do this
+        // as well. We don't want random interrupts to cause us to fail to create the consumer and have the table
+        // stuck in ERROR state.
+        Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);

Review Comment:
   @navina, thoughts since you're working a lot on ingestion? I opted for constant backoff to not introduce too much lag into ingestion, but could be convinced otherwise



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org