You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2019/11/13 15:59:03 UTC

[storm] branch master updated: STORM-3529: Catch and Log Kafka RetriableException

This is an automated email from the ASF dual-hosted git repository.

srdo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new b17e548  STORM-3529: Catch and Log Kafka RetriableException
     new dd8ce7b  Merge pull request #3164 from OliverMD/STORM-3529
b17e548 is described below

commit b17e5483cdecf19b3c6b6f5aa8bc7bedb1a23340
Author: Oliver Downard <ol...@btinternet.com>
AuthorDate: Sun Nov 10 09:43:22 2019 +0000

    STORM-3529: Catch and Log Kafka RetriableException
---
 .../kafka/spout/metrics/KafkaOffsetMetric.java      | 14 ++++++++++++--
 .../kafka/spout/KafkaSpoutSingleTopicTest.java      | 21 ++++++++++++++++-----
 2 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index da84979..496e1d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -24,6 +24,7 @@ import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.metric.api.IMetric;
 import org.slf4j.Logger;
@@ -76,8 +77,17 @@ public class KafkaOffsetMetric<K, V> implements IMetric {
         Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
         Set<TopicPartition> topicPartitions = offsetManagers.keySet();
 
-        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
-        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> beginningOffsets;
+        Map<TopicPartition, Long> endOffsets;
+
+        try {
+            beginningOffsets = consumer.beginningOffsets(topicPartitions);
+            endOffsets = consumer.endOffsets(topicPartitions);
+        } catch (RetriableException e) {
+            LOG.warn("Failed to get offsets from Kafka! Will retry on next metrics tick.", e);
+            return null;
+        }
+
         //map to hold partition level and topic level metrics
         Map<String, Long> result = new HashMap<>();
 
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 512d274..d7f563f 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -21,17 +21,14 @@ package org.apache.storm.kafka.spout;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyListOf;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.*;
 
 import java.util.HashSet;
 import java.util.List;
@@ -39,8 +36,10 @@ import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.tuple.Values;
 import org.apache.storm.utils.Time;
@@ -428,4 +427,16 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue(), 10);
         assertEquals(offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue(), 0);
     }
+
+    @Test
+    public void testOffsetMetricsReturnsNullWhenRetriableExceptionThrown() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        // Ensure a timeout exception results in the return value being null
+        when(getKafkaConsumer().beginningOffsets(anyCollection())).thenThrow(TimeoutException.class);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertNull(offsetMetric);
+    }
 }