You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2016/03/15 20:39:38 UTC
[1/3] storm git commit: STORM-971: Metric for messages lost due to
kafka retention
Repository: storm
Updated Branches:
refs/heads/master 3b6813838 -> 500ef20d5
STORM-971: Metric for messages lost due to kafka retention
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c761c5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c761c5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c761c5d
Branch: refs/heads/master
Commit: 8c761c5d6d01cdc50db27f144ca361067ffb3ba7
Parents: 6390d18
Author: Abhishek Agarwal <ab...@inmobi.com>
Authored: Sun Mar 13 00:37:14 2016 +0530
Committer: Abhishek Agarwal <ab...@inmobi.com>
Committed: Sun Mar 13 00:37:14 2016 +0530
----------------------------------------------------------------------
.../jvm/org/apache/storm/kafka/PartitionManager.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8c761c5d/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 9d78fdc..5c8fda8 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -44,6 +44,8 @@ public class PartitionManager {
private final ReducedMetric _fetchAPILatencyMean;
private final CountMetric _fetchAPICallCount;
private final CountMetric _fetchAPIMessageCount;
+ // Count of messages which could not be emitted or retried because they were deleted from kafka
+ private final CountMetric _lostMessageCount;
Long _emittedToOffset;
// _pending key = Kafka offset, value = time at which the message was first submitted to the topology
private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -117,6 +119,7 @@ public class PartitionManager {
_fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
_fetchAPICallCount = new CountMetric();
_fetchAPIMessageCount = new CountMetric();
+ _lostMessageCount = new CountMetric();
}
public Map getMetricsDataMap() {
@@ -125,6 +128,7 @@ public class PartitionManager {
ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
+ ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset());
return ret;
}
@@ -185,7 +189,7 @@ public class PartitionManager {
msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
- // fetch failed, so don't update the metrics
+ // fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
@@ -194,11 +198,17 @@ public class PartitionManager {
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset);
+
+ // Omitted messages have not been acked and may be lost
+ if (null != omitted) {
+ _lostMessageCount.incrBy(omitted.size());
+ }
LOG.warn("Removing the failed offsets that are out of range: {}", omitted);
}
if (offset > _emittedToOffset) {
+ _lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset);
}
[3/3] storm git commit: add STORM-971 to changelog
Posted by pt...@apache.org.
add STORM-971 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/500ef20d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/500ef20d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/500ef20d
Branch: refs/heads/master
Commit: 500ef20d5a07ad56a22817af70810608046a3b42
Parents: b048ea2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 15 15:37:40 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 15 15:37:40 2016 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/500ef20d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index a7eb1c3..f6fdb76 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -57,6 +57,7 @@
* STORM-1521: When using Kerberos login from keytab with multiple bolts/executors ticket is not renewed in hbase bolt.
## 1.0.0
+ * STORM-971: Metric for messages lost due to kafka retention
* STORM-1483: add storm-mongodb connector
* STORM-1608: Fix stateful topology acking behavior
* STORM-1609: Netty Client is not best effort delivery on failed Connection
[2/3] storm git commit: Merge branch 'lostcount' of
github.com:abhishekagarwal87/storm
Posted by pt...@apache.org.
Merge branch 'lostcount' of github.com:abhishekagarwal87/storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b048ea20
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b048ea20
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b048ea20
Branch: refs/heads/master
Commit: b048ea20bdfe5b1d72c68b43662279ee89990997
Parents: 3b68138 8c761c5
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Mar 15 15:36:34 2016 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Mar 15 15:36:34 2016 -0400
----------------------------------------------------------------------
.../jvm/org/apache/storm/kafka/PartitionManager.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------