You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/11/13 15:59:03 UTC
[storm] branch master updated: STORM-3529: Catch and Log Kafka
RetriableException
This is an automated email from the ASF dual-hosted git repository.
srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new b17e548 STORM-3529: Catch and Log Kafka RetriableException
new dd8ce7b Merge pull request #3164 from OliverMD/STORM-3529
b17e548 is described below
commit b17e5483cdecf19b3c6b6f5aa8bc7bedb1a23340
Author: Oliver Downard <ol...@btinternet.com>
AuthorDate: Sun Nov 10 09:43:22 2019 +0000
STORM-3529: Catch and Log Kafka RetriableException
---
.../kafka/spout/metrics/KafkaOffsetMetric.java | 14 ++++++++++++--
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 21 ++++++++++++++++-----
2 files changed, 28 insertions(+), 7 deletions(-)
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@ import java.util.Set;
import java.util.function.Supplier;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.metric.api.IMetric;
import org.slf4j.Logger;
@@ -76,8 +77,17 @@ public class KafkaOffsetMetric<K, V> implements IMetric {
Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
Set<TopicPartition> topicPartitions = offsetManagers.keySet();
- Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
- Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+ Map<TopicPartition, Long> beginningOffsets;
+ Map<TopicPartition, Long> endOffsets;
+
+ try {
+ beginningOffsets = consumer.beginningOffsets(topicPartitions);
+ endOffsets = consumer.endOffsets(topicPartitions);
+ } catch (RetriableException e) {
+ LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+ return null;
+ }
+
//map to hold partition level and topic level metrics
Map<String, Long> result = new HashMap<>();
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@ package org.apache.storm.kafka.spout;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.anyListOf;
import static org.mockito.ArgumentMatchers.anyObject;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
import java.util.HashSet;
import java.util.List;
@@ -39,8 +36,10 @@ import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
}
+
+ @Test
+ public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ // Ensure a timeout exception results in the return value being null
+ when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertNull(offsetMetric);
+ }
}