You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2022/01/21 22:13:44 UTC

[pinot] branch master updated: Classify timeout exception as TransientException so that the caller can retry on it. (#8043)

This is an automated email from the ASF dual-hosted git repository.

tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5701117  Classify timeout exception as TransientException so that the caller can retry on it. (#8043)
5701117 is described below

commit 5701117fed61476e2da2e4428216e02f7d5bf217
Author: Ting Chen <ti...@uber.com>
AuthorDate: Fri Jan 21 14:13:30 2022 -0800

    Classify timeout exception as TransientException so that the caller can retry on it. (#8043)
---
 .../kafka20/KafkaStreamMetadataProvider.java       | 31 +++++++++++++++-------
 .../spi/stream/PartitionGroupMetadataFetcher.java  |  2 +-
 2 files changed, 22 insertions(+), 11 deletions(-)

diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 519b3d0..5af4d7d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -22,11 +22,13 @@ import com.google.common.base.Preconditions;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.OffsetCriteria;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
 
 
 public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler
@@ -42,23 +44,32 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
 
   @Override
   public int fetchPartitionCount(long timeoutMillis) {
-    return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+    try {
+      return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
+    }
   }
 
   @Override
   public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
     Preconditions.checkNotNull(offsetCriteria);
     long offset;
-    if (offsetCriteria.isLargest()) {
-      offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
-          .get(_topicPartition);
-    } else if (offsetCriteria.isSmallest()) {
-      offset = _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
-          .get(_topicPartition);
-    } else {
-      throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
+    try {
+      if (offsetCriteria.isLargest()) {
+        offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+            .get(_topicPartition);
+      } else if (offsetCriteria.isSmallest()) {
+        offset =
+            _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+                .get(_topicPartition);
+      } else {
+        throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
+      }
+      return new LongMsgOffset(offset);
+    } catch (TimeoutException e) {
+      throw new TransientConsumerException(e);
     }
-    return new LongMsgOffset(offset);
   }
 
   @Override
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 58cbb3f..6ffdbe0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -74,7 +74,7 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
       }
       return Boolean.TRUE;
     } catch (TransientConsumerException e) {
-      LOGGER.warn("Could not get partition count for topic {}", _topicName, e);
+      LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e);
       _exception = e;
       return Boolean.FALSE;
     } catch (Exception e) {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org