You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/01/03 18:04:42 UTC

[1/2] storm git commit: STORM-2867: Add consumer lag metrics to KafkaSpout

Repository: storm
Updated Branches:
  refs/heads/master d644e29a8 -> 69e596ac6


 STORM-2867: Add consumer lag metrics to KafkaSpout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b2b2dbd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b2b2dbd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b2b2dbd

Branch: refs/heads/master
Commit: 9b2b2dbd087aea6f5081442338c90e24f9799e4f
Parents: d644e29
Author: Manikumar Reddy O <ma...@gmail.com>
Authored: Wed Dec 20 20:48:51 2017 +0530
Committer: Manikumar Reddy <ma...@gmail.com>
Committed: Wed Jan 3 18:31:59 2018 +0530

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  27 ++++
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  22 ++-
 .../kafka/spout/internal/OffsetManager.java     |  13 +-
 .../kafka/spout/metrics/KafkaOffsetMetric.java  | 141 +++++++++++++++++++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  10 ++
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  33 +++++
 pom.xml                                         |   2 +-
 7 files changed, 245 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9fe654f..c52309e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,6 +39,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang.Validate;
@@ -56,6 +57,7 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -104,6 +106,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient TopologyContext context;
     // Metadata information to commit to Kafka. It is unique per spout per topology.
     private transient String commitMetadata;
+    private transient KafkaOffsetMetric kafkaOffsetMetric;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
@@ -142,10 +145,29 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         setCommitMetadata(context);
 
         tupleListener.open(conf, context);
+        if (canRegisterMetrics()) {
+            registerMetric();
+        }
 
         LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
     }
 
+    private void registerMetric() {
+        LOG.info("Registering Spout Metrics");
+        kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
+        context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
+    }
+
+    private boolean canRegisterMetrics() {
+        try {
+            KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
+        } catch (NoSuchMethodException e) {
+            LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
+            return false;
+        }
+        return true;
+    }
+
     private void setCommitMetadata(TopologyContext context) {
         try {
             commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
@@ -707,4 +729,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             return !this.pollablePartitions.isEmpty();
         }
     }
+
+    @VisibleForTesting
+    KafkaOffsetMetric getKafkaOffsetMetric() {
+        return  kafkaOffsetMetric;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 1c99f24..a063790 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
@@ -70,6 +69,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
     public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);
 
+    public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
+
+
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
     private final Subscription subscription;
@@ -86,6 +88,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final boolean emitNullTuples;
     private final ProcessingGuarantee processingGuarantee;
     private final boolean tupleTrackingEnforced;
+    private final int metricsTimeBucketSizeInSecs;
 
     /**
      * Creates a new KafkaSpoutConfig using a Builder.
@@ -107,6 +110,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.emitNullTuples = builder.emitNullTuples;
         this.processingGuarantee = builder.processingGuarantee;
         this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
+        this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
     }
 
     /**
@@ -177,6 +181,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private boolean emitNullTuples = false;
         private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
         private boolean tupleTrackingEnforced = false;
+        private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
 
         public Builder(String bootstrapServers, String... topics) {
             this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
@@ -395,6 +400,15 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
+        /**
+         * The time period that metrics data in bucketed into.
+         * @param metricsTimeBucketSizeInSecs time in seconds
+         */
+        public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
+            this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
+            return this;
+        }
+
         public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -533,6 +547,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return emitNullTuples;
     }
 
+    public int getMetricsTimeBucketSizeInSecs() {
+        return metricsTimeBucketSizeInSecs;
+    }
+
     @Override
     public String toString() {
         return "KafkaSpoutConfig{"
@@ -545,6 +563,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             + ", translator=" + translator
             + ", retryService=" + retryService
             + ", tupleListener=" + tupleListener
+            + ", processingGuarantee=" + processingGuarantee
+            + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
             + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index ec6f2a1..c9f9541 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -46,6 +46,7 @@ public class OffsetManager {
     private long committedOffset;
     // True if this OffsetManager has made at least one commit to Kafka
     private boolean committed;
+    private long latestEmittedOffset;
 
     /**
      * Creates a new OffsetManager.
@@ -63,7 +64,8 @@ public class OffsetManager {
     }
 
     public void addToEmitMsgs(long offset) {
-        this.emittedOffsets.add(offset);                  // O(Log N)
+        this.emittedOffsets.add(offset);  // O(Log N)
+        this.latestEmittedOffset = Math.max(latestEmittedOffset, offset);
     }
     
     public int getNumUncommittedOffsets() {
@@ -215,6 +217,14 @@ public class OffsetManager {
         return emittedOffsets.contains(offset);
     }
 
+    public long getLatestEmittedOffset() {
+        return latestEmittedOffset;
+    }
+
+    public long getCommittedOffset() {
+        return committedOffset;
+    }
+
     @Override
     public final String toString() {
         return "OffsetManager{"
@@ -222,6 +232,7 @@ public class OffsetManager {
             + ", committedOffset=" + committedOffset
             + ", emittedOffsets=" + emittedOffsets
             + ", ackedMsgs=" + ackedMsgs
+            + ", latestEmittedOffset=" + latestEmittedOffset
             + '}';
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
new file mode 100644
index 0000000..d6ed209
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used compute the partition and topic level offset metrics.
+ * <p>
+ * Partition level metrics are:
+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
+ * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
+ * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
+ * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
+ * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
+ * topicName/partition_{number}/recordsInPartition // total number of records in the partition
+ * </p>
+ * <p>
+ * Topic level metrics are:
+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
+ * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
+ * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
+ * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
+ * topicName/spoutLag // total spout lag of all the associated partitions of this spout
+ * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
+ * </p>
+ */
+public class KafkaOffsetMetric implements IMetric {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
+    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+    private final Supplier<KafkaConsumer> consumerSupplier;
+
+    public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<KafkaConsumer> consumerSupplier) {
+        this.offsetManagerSupplier = offsetManagerSupplier;
+        this.consumerSupplier = consumerSupplier;
+    }
+
+    @Override
+    public Object getValueAndReset() {
+
+        Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
+        KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+        if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
+            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
+            return null;
+        }
+
+        Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
+        //map to hold partition level and topic level metrics
+        Map<String, Long> result = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            OffsetManager offsetManager = entry.getValue();
+
+            long latestTimeOffset = endOffsets.get(topicPartition);
+            long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
+            long latestCompletedOffset = offsetManager.getCommittedOffset();
+            long spoutLag = latestTimeOffset - latestCompletedOffset;
+            long recordsInPartition =  latestTimeOffset - earliestTimeOffset;
+
+            String metricPath = topicPartition.topic()  + "/partition_" + topicPartition.partition();
+            result.put(metricPath + "/" + "spoutLag", spoutLag);
+            result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+            result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+            result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+            result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
+            result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);
+
+            TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
+            if (topicMetrics == null) {
+                topicMetrics = new TopicMetrics();
+                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
+            }
+
+            topicMetrics.totalSpoutLag += spoutLag;
+            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+            topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
+            topicMetrics.totalRecordsInPartitions += recordsInPartition;
+        }
+
+        for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+            String topic = e.getKey();
+            TopicMetrics topicMetrics = e.getValue();
+            result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+            result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+            result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+            result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+            result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
+            result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
+        }
+
+        LOG.debug("Metrics Tick: value : {}", result);
+        return result;
+    }
+
+    private class TopicMetrics {
+        long totalSpoutLag = 0;
+        long totalEarliestTimeOffset = 0;
+        long totalLatestTimeOffset = 0;
+        long totalLatestEmittedOffset = 0;
+        long totalLatestCompletedOffset = 0;
+        long totalRecordsInPartitions = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index a3fc984..9885cf6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -45,6 +45,7 @@ public class KafkaSpoutConfigTest {
         expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         assertEquals(expected, conf.getKafkaProps());
+        assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
     }
 
     @Test
@@ -75,4 +76,13 @@ public class KafkaSpoutConfigTest {
         assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
             conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
     }
+
+    @Test
+    public void testMetricsTimeBucketSizeInSecs() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+             .setMetricsTimeBucketSizeInSecs(100)
+            .build();
+
+        assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 1470332..7842349 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -47,6 +47,7 @@ import org.apache.storm.utils.Time;
 import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
 
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -344,4 +345,36 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
         spout.nextTuple();
         verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
     }
+
+    @Test
+    public void testOffsetMetrics() throws Exception {
+        final int messageCount = 10;
+        prepareSpout(messageCount);
+
+        Map<String, Long> offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        // the offset of the last available message + 1.
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+        //Emit all messages and check that they are emitted. Ack the messages too
+        for (int i = 0; i < messageCount; i++) {
+            nextTuple_verifyEmitted_ack_resetCollector(i);
+        }
+
+        commitAndVerifyAllMessagesCommitted(messageCount);
+
+        offsetMetric  = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+        //latest offset
+        assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+        // offset where processing will resume upon spout restart
+        assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+        assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 543c0d0..65b1cca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,7 +304,7 @@
         <storm.kafka.artifact.id>kafka_2.10</storm.kafka.artifact.id>
 
         <!-- kafka version used by new storm-kafka-client spout code -->
-        <storm.kafka.client.version>0.10.0.0</storm.kafka.client.version>
+        <storm.kafka.client.version>0.10.1.0</storm.kafka.client.version>
 
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->


[2/2] storm git commit: Merge branch 'kafka-metrics-apache-master' of https://github.com/omkreddy/storm into asfgit-master

Posted by sr...@apache.org.
Merge branch 'kafka-metrics-apache-master' of https://github.com/omkreddy/storm into asfgit-master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69e596ac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69e596ac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69e596ac

Branch: refs/heads/master
Commit: 69e596ac60ac152c854c238e98783c57e432d86c
Parents: d644e29 9b2b2db
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jan 3 19:02:11 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jan 3 19:02:11 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  27 ++++
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  22 ++-
 .../kafka/spout/internal/OffsetManager.java     |  13 +-
 .../kafka/spout/metrics/KafkaOffsetMetric.java  | 141 +++++++++++++++++++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java |  10 ++
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  |  33 +++++
 pom.xml                                         |   2 +-
 7 files changed, 245 insertions(+), 3 deletions(-)
----------------------------------------------------------------------