You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ti...@apache.org on 2022/01/21 22:13:44 UTC
[pinot] branch master updated: Classify timeout exception as TransientException so that the caller can retry on it. (#8043)
This is an automated email from the ASF dual-hosted git repository.
tingchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5701117 Classify timeout exception as TransientException so that the caller can retry on it. (#8043)
5701117 is described below
commit 5701117fed61476e2da2e4428216e02f7d5bf217
Author: Ting Chen <ti...@uber.com>
AuthorDate: Fri Jan 21 14:13:30 2022 -0800
Classify timeout exception as TransientException so that the caller can retry on it. (#8043)
---
.../kafka20/KafkaStreamMetadataProvider.java | 31 +++++++++++++++-------
.../spi/stream/PartitionGroupMetadataFetcher.java | 2 +-
2 files changed, 22 insertions(+), 11 deletions(-)
diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index 519b3d0..5af4d7d 100644
--- a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++ b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -22,11 +22,13 @@ import com.google.common.base.Preconditions;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.stream.TransientConsumerException;
public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHandler
@@ -42,23 +44,32 @@ public class KafkaStreamMetadataProvider extends KafkaPartitionLevelConnectionHa
@Override
public int fetchPartitionCount(long timeoutMillis) {
- return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+ try {
+ return _consumer.partitionsFor(_topic, Duration.ofMillis(timeoutMillis)).size();
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
}
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
long offset;
- if (offsetCriteria.isLargest()) {
- offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- } else if (offsetCriteria.isSmallest()) {
- offset = _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
- .get(_topicPartition);
- } else {
- throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
+ try {
+ if (offsetCriteria.isLargest()) {
+ offset = _consumer.endOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else if (offsetCriteria.isSmallest()) {
+ offset =
+ _consumer.beginningOffsets(Collections.singletonList(_topicPartition), Duration.ofMillis(timeoutMillis))
+ .get(_topicPartition);
+ } else {
+ throw new IllegalArgumentException("Unknown initial offset value " + offsetCriteria);
+ }
+ return new LongMsgOffset(offset);
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
}
- return new LongMsgOffset(offset);
}
@Override
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 58cbb3f..6ffdbe0 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -74,7 +74,7 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
}
return Boolean.TRUE;
} catch (TransientConsumerException e) {
- LOGGER.warn("Could not get partition count for topic {}", _topicName, e);
+ LOGGER.warn("Transient Exception: Could not get partition count for topic {}", _topicName, e);
_exception = e;
return Boolean.FALSE;
} catch (Exception e) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org