You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/15 20:39:38 UTC

[1/3] storm git commit: STORM-971: Metric for messages lost due to kafka retention

Repository: storm
Updated Branches:
  refs/heads/master 3b6813838 -> 500ef20d5


STORM-971: Metric for messages lost due to kafka retention


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c761c5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c761c5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c761c5d

Branch: refs/heads/master
Commit: 8c761c5d6d01cdc50db27f144ca361067ffb3ba7
Parents: 6390d18
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Sun Mar 13 00:37:14 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Sun Mar 13 00:37:14 2016 +0530

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/PartitionManager.java    | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8c761c5d/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 9d78fdc..5c8fda8 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -44,6 +44,8 @@ public class PartitionManager {
     private final ReducedMetric _fetchAPILatencyMean;
     private final CountMetric _fetchAPICallCount;
     private final CountMetric _fetchAPIMessageCount;
+    // Count of messages which could not be emitted or retried because they were deleted from kafka
+    private final CountMetric _lostMessageCount;
     Long _emittedToOffset;
     // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
     private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -117,6 +119,7 @@ public class PartitionManager {
         _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
         _fetchAPICallCount = new CountMetric();
         _fetchAPIMessageCount = new CountMetric();
+        _lostMessageCount = new CountMetric();
     }
 
     public Map getMetricsDataMap() {
@@ -125,6 +128,7 @@ public class PartitionManager {
         ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
         ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
         ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
+        ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
         return ret;
     }
 
@@ -185,7 +189,7 @@ public class PartitionManager {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
             offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            // fetch failed, so don't update the metrics
+            // fetch failed, so don't update the fetch metrics
             
             //fix bug [STORM-643] : remove outdated failed offsets
             if (!processingNewTuples) {
@@ -194,11 +198,17 @@ public class PartitionManager {
                 // offset, since they are anyway not there.
                 // These calls to broker API will be then saved.
                 Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+
+                // Omitted messages have not been acked and may be lost
+                if (null != omitted) {
+                    _lostMessageCount.incrBy(omitted.size());
+                }
                 
                 LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
             }
 
             if (offset > _emittedToOffset) {
+                _lostMessageCount.incrBy(offset - _emittedToOffset);
                 _emittedToOffset = offset;
                 LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset);
             }


[3/3] storm git commit: add STORM-971 to changelog

Posted by pt...@apache.org.
add STORM-971 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/500ef20d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/500ef20d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/500ef20d

Branch: refs/heads/master
Commit: 500ef20d5a07ad56a22817af70810608046a3b42
Parents: b048ea2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 15 15:37:40 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 15 15:37:40 2016 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/500ef20d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a7eb1c3..f6fdb76 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -57,6 +57,7 @@
  * STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
 
 ## 1.0.0
+ * STORM-971: Metric for messages lost due to kafka retention
  * STORM-1483: add storm-mongodb connector
  * STORM-1608: Fix stateful topology acking behavior
  * STORM-1609: Netty Client is not best effort delivery on failed Connection


[2/3] storm git commit: Merge branch 'lostcount' of github.com:abhishekagarwal87/storm

Posted by pt...@apache.org.
Merge branch 'lostcount' of github.com:abhishekagarwal87/storm


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b048ea20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b048ea20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b048ea20

Branch: refs/heads/master
Commit: b048ea20bdfe5b1d72c68b43662279ee89990997
Parents: 3b68138 8c761c5
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 15 15:36:34 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 15 15:36:34 2016 -0400

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/PartitionManager.java    | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------