You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/01/03 18:04:42 UTC
[1/2] storm git commit: STORM-2867: Add consumer lag metrics to
KafkaSpout
Repository: storm
Updated Branches:
refs/heads/master d644e29a8 -> 69e596ac6
STORM-2867: Add consumer lag metrics to KafkaSpout
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9b2b2dbd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9b2b2dbd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9b2b2dbd
Branch: refs/heads/master
Commit: 9b2b2dbd087aea6f5081442338c90e24f9799e4f
Parents: d644e29
Author: Manikumar Reddy O <ma...@gmail.com>
Authored: Wed Dec 20 20:48:51 2017 +0530
Committer: Manikumar Reddy <ma...@gmail.com>
Committed: Wed Jan 3 18:31:59 2018 +0530
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 27 ++++
.../storm/kafka/spout/KafkaSpoutConfig.java | 22 ++-
.../kafka/spout/internal/OffsetManager.java | 13 +-
.../kafka/spout/metrics/KafkaOffsetMetric.java | 141 +++++++++++++++++++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 10 ++
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 33 +++++
pom.xml | 2 +-
7 files changed, 245 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9fe654f..c52309e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -39,6 +39,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang.Validate;
@@ -56,6 +57,7 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.storm.kafka.spout.internal.OffsetManager;
import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -104,6 +106,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private transient TopologyContext context;
// Metadata information to commit to Kafka. It is unique per spout per topology.
private transient String commitMetadata;
+ private transient KafkaOffsetMetric kafkaOffsetMetric;
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
@@ -142,10 +145,29 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
setCommitMetadata(context);
tupleListener.open(conf, context);
+ if (canRegisterMetrics()) {
+ registerMetric();
+ }
LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
}
+ private void registerMetric() {
+ LOG.info("Registering Spout Metrics");
+ kafkaOffsetMetric = new KafkaOffsetMetric(() -> offsetManagers, () -> kafkaConsumer);
+ context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
+ }
+
+ private boolean canRegisterMetrics() {
+ try {
+ KafkaConsumer.class.getDeclaredMethod("beginningOffsets", Collection.class);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Minimum required kafka-clients library version to enable metrics is 0.10.1.0. Disabling spout metrics.");
+ return false;
+ }
+ return true;
+ }
+
private void setCommitMetadata(TopologyContext context) {
try {
commitMetadata = JSON_MAPPER.writeValueAsString(new CommitMetadata(
@@ -707,4 +729,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
return !this.pollablePartitions.isEmpty();
}
}
+
+ @VisibleForTesting
+ KafkaOffsetMetric getKafkaOffsetMetric() {
+ return kafkaOffsetMetric;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 1c99f24..a063790 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.regex.Pattern;
@@ -70,6 +69,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
public static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);
+ public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
+
+
// Kafka consumer configuration
private final Map<String, Object> kafkaProps;
private final Subscription subscription;
@@ -86,6 +88,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private final boolean emitNullTuples;
private final ProcessingGuarantee processingGuarantee;
private final boolean tupleTrackingEnforced;
+ private final int metricsTimeBucketSizeInSecs;
/**
* Creates a new KafkaSpoutConfig using a Builder.
@@ -107,6 +110,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
this.emitNullTuples = builder.emitNullTuples;
this.processingGuarantee = builder.processingGuarantee;
this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
+ this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
}
/**
@@ -177,6 +181,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
private boolean emitNullTuples = false;
private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
private boolean tupleTrackingEnforced = false;
+ private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
public Builder(String bootstrapServers, String... topics) {
this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
@@ -395,6 +400,15 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return this;
}
+ /**
+ * The time period that metrics data in bucketed into.
+ * @param metricsTimeBucketSizeInSecs time in seconds
+ */
+ public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
+ this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
+ return this;
+ }
+
public KafkaSpoutConfig<K, V> build() {
return new KafkaSpoutConfig<>(this);
}
@@ -533,6 +547,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
return emitNullTuples;
}
+ public int getMetricsTimeBucketSizeInSecs() {
+ return metricsTimeBucketSizeInSecs;
+ }
+
@Override
public String toString() {
return "KafkaSpoutConfig{"
@@ -545,6 +563,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
+ ", translator=" + translator
+ ", retryService=" + retryService
+ ", tupleListener=" + tupleListener
+ + ", processingGuarantee=" + processingGuarantee
+ + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
+ '}';
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index ec6f2a1..c9f9541 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -46,6 +46,7 @@ public class OffsetManager {
private long committedOffset;
// True if this OffsetManager has made at least one commit to Kafka
private boolean committed;
+ private long latestEmittedOffset;
/**
* Creates a new OffsetManager.
@@ -63,7 +64,8 @@ public class OffsetManager {
}
public void addToEmitMsgs(long offset) {
- this.emittedOffsets.add(offset); // O(Log N)
+ this.emittedOffsets.add(offset); // O(Log N)
+ this.latestEmittedOffset = Math.max(latestEmittedOffset, offset);
}
public int getNumUncommittedOffsets() {
@@ -215,6 +217,14 @@ public class OffsetManager {
return emittedOffsets.contains(offset);
}
+ public long getLatestEmittedOffset() {
+ return latestEmittedOffset;
+ }
+
+ public long getCommittedOffset() {
+ return committedOffset;
+ }
+
@Override
public final String toString() {
return "OffsetManager{"
@@ -222,6 +232,7 @@ public class OffsetManager {
+ ", committedOffset=" + committedOffset
+ ", emittedOffsets=" + emittedOffsets
+ ", ackedMsgs=" + ackedMsgs
+ + ", latestEmittedOffset=" + latestEmittedOffset
+ '}';
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
new file mode 100644
index 0000000..d6ed209
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used compute the partition and topic level offset metrics.
+ * <p>
+ * Partition level metrics are:
+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
+ * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
+ * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
+ * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
+ * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
+ * topicName/partition_{number}/recordsInPartition // total number of records in the partition
+ * </p>
+ * <p>
+ * Topic level metrics are:
+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
+ * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
+ * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
+ * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
+ * topicName/spoutLag // total spout lag of all the associated partitions of this spout
+ * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
+ * </p>
+ */
+public class KafkaOffsetMetric implements IMetric {
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
+ private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+ private final Supplier<KafkaConsumer> consumerSupplier;
+
+ public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier, Supplier<KafkaConsumer> consumerSupplier) {
+ this.offsetManagerSupplier = offsetManagerSupplier;
+ this.consumerSupplier = consumerSupplier;
+ }
+
+ @Override
+ public Object getValueAndReset() {
+
+ Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
+ KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+ if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
+ LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
+ return null;
+ }
+
+ Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+ Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+ Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+ Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
+ //map to hold partition level and topic level metrics
+ Map<String, Long> result = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ OffsetManager offsetManager = entry.getValue();
+
+ long latestTimeOffset = endOffsets.get(topicPartition);
+ long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+ long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
+ long latestCompletedOffset = offsetManager.getCommittedOffset();
+ long spoutLag = latestTimeOffset - latestCompletedOffset;
+ long recordsInPartition = latestTimeOffset - earliestTimeOffset;
+
+ String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition();
+ result.put(metricPath + "/" + "spoutLag", spoutLag);
+ result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+ result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+ result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+ result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
+ result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);
+
+ TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
+ if (topicMetrics == null) {
+ topicMetrics = new TopicMetrics();
+ topicMetricsMap.put(topicPartition.topic(), topicMetrics);
+ }
+
+ topicMetrics.totalSpoutLag += spoutLag;
+ topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+ topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+ topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+ topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
+ topicMetrics.totalRecordsInPartitions += recordsInPartition;
+ }
+
+ for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+ String topic = e.getKey();
+ TopicMetrics topicMetrics = e.getValue();
+ result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+ result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+ result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+ result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+ result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
+ result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
+ }
+
+ LOG.debug("Metrics Tick: value : {}", result);
+ return result;
+ }
+
+ private class TopicMetrics {
+ long totalSpoutLag = 0;
+ long totalEarliestTimeOffset = 0;
+ long totalLatestTimeOffset = 0;
+ long totalLatestEmittedOffset = 0;
+ long totalLatestCompletedOffset = 0;
+ long totalRecordsInPartitions = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
index a3fc984..9885cf6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -45,6 +45,7 @@ public class KafkaSpoutConfigTest {
expected.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
expected.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
assertEquals(expected, conf.getKafkaProps());
+ assertEquals(KafkaSpoutConfig.DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS, conf.getMetricsTimeBucketSizeInSecs());
}
@Test
@@ -75,4 +76,13 @@ public class KafkaSpoutConfigTest {
assertThat("Should allow users to pick a different auto offset reset policy than the one recommended for the at-least-once processing guarantee",
conf.getKafkaProps().get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), is("none"));
}
+
+ @Test
+ public void testMetricsTimeBucketSizeInSecs() {
+ KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic")
+ .setMetricsTimeBucketSizeInSecs(100)
+ .build();
+
+ assertEquals(100, conf.getMetricsTimeBucketSizeInSecs());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 1470332..7842349 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -47,6 +47,7 @@ import org.apache.storm.utils.Time;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
+import static org.junit.Assert.assertEquals;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -344,4 +345,36 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
spout.nextTuple();
verify(collectorMock).emit(anyString(), anyList(), any(KafkaSpoutMessageId.class));
}
+
+ @Test
+ public void testOffsetMetrics() throws Exception {
+ final int messageCount = 10;
+ prepareSpout(messageCount);
+
+ Map<String, Long> offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+ // the offset of the last available message + 1.
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalRecordsInPartitions").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+ //totalSpoutLag = totalLatestTimeOffset-totalLatestCompletedOffset
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+
+ //Emit all messages and check that they are emitted. Ack the messages too
+ for (int i = 0; i < messageCount; i++) {
+ nextTuple_verifyEmitted_ack_resetCollector(i);
+ }
+
+ commitAndVerifyAllMessagesCommitted(messageCount);
+
+ offsetMetric = (Map<String, Long>) spout.getKafkaOffsetMetric().getValueAndReset();
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalEarliestTimeOffset").longValue());
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestTimeOffset").longValue());
+ //latest offset
+ assertEquals(9, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestEmittedOffset").longValue());
+ // offset where processing will resume upon spout restart
+ assertEquals(10, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalLatestCompletedOffset").longValue());
+ assertEquals(0, offsetMetric.get(SingleTopicKafkaSpoutConfiguration.TOPIC+"/totalSpoutLag").longValue());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/9b2b2dbd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 543c0d0..65b1cca 100644
--- a/pom.xml
+++ b/pom.xml
@@ -304,7 +304,7 @@
<storm.kafka.artifact.id>kafka_2.10</storm.kafka.artifact.id>
<!-- kafka version used by new storm-kafka-client spout code -->
- <storm.kafka.client.version>0.10.0.0</storm.kafka.client.version>
+ <storm.kafka.client.version>0.10.1.0</storm.kafka.client.version>
<!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
[2/2] storm git commit: Merge branch 'kafka-metrics-apache-master' of
https://github.com/omkreddy/storm into asfgit-master
Posted by sr...@apache.org.
Merge branch 'kafka-metrics-apache-master' of https://github.com/omkreddy/storm into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69e596ac
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69e596ac
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69e596ac
Branch: refs/heads/master
Commit: 69e596ac60ac152c854c238e98783c57e432d86c
Parents: d644e29 9b2b2db
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jan 3 19:02:11 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jan 3 19:02:11 2018 +0100
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 27 ++++
.../storm/kafka/spout/KafkaSpoutConfig.java | 22 ++-
.../kafka/spout/internal/OffsetManager.java | 13 +-
.../kafka/spout/metrics/KafkaOffsetMetric.java | 141 +++++++++++++++++++
.../storm/kafka/spout/KafkaSpoutConfigTest.java | 10 ++
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 33 +++++
pom.xml | 2 +-
7 files changed, 245 insertions(+), 3 deletions(-)
----------------------------------------------------------------------