You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/07 21:49:22 UTC
git commit: SAMZA-28; fixing bug in blocking envelope map metrics.
Updated Branches:
refs/heads/master 79a4376d0 -> ef95569f8
SAMZA-28; fixing bug in blocking envelope map metrics.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ef95569f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ef95569f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ef95569f
Branch: refs/heads/master
Commit: ef95569f8cec240cb0214012e5f6b15b8f4d4f6f
Parents: 79a4376
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Mon Oct 7 12:49:12 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Oct 7 12:49:12 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/metrics/Gauge.java | 4 ++
.../apache/samza/util/BlockingEnvelopeMap.java | 60 +++++++++++---------
2 files changed, 37 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
index 14db2d3..3335c15 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
@@ -30,6 +30,10 @@ public class Gauge<T> implements Metric {
this.ref = new AtomicReference<T>(value);
}
+ public boolean compareAndSet(T expected, T n) {
+ return ref.compareAndSet(expected, n);
+ }
+
public T set(T n) {
return ref.getAndSet(n);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index c1dd279..d4db126 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -75,7 +76,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
- this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry);
+ this.metrics = new BlockingEnvelopeMapMetrics(this.getClass().getName(), metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
@@ -116,8 +117,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
}
- metrics.decBufferedMessageCount(systemStreamPartition, systemStreamPartitionMessages.size());
-
// Now block if blocking is allowed and we have no messages.
if (systemStreamPartitionMessages.size() == 0) {
// How long we can legally block (if timeout > 0)
@@ -130,7 +129,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
if (envelope != null) {
systemStreamPartitionMessages.add(envelope);
- metrics.decBufferedMessageCount(systemStreamPartition, 1);
}
}
} else if (timeout > 0 && timeRemaining > 0) {
@@ -139,7 +137,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
if (envelope != null) {
systemStreamPartitionMessages.add(envelope);
- metrics.decBufferedMessageCount(systemStreamPartition, 1);
}
}
}
@@ -152,7 +149,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
bufferedMessages.get(systemStreamPartition).add(envelope);
- metrics.incBufferedMessageCount(systemStreamPartition, 1);
}
protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) {
@@ -161,8 +157,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
for (IncomingMessageEnvelope envelope : envelopes) {
queue.add(envelope);
}
-
- metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size());
}
public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
@@ -186,41 +180,32 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
}
- public static final class BlockingEnvelopeMapMetrics {
- private static final String GROUP = "samza.consumers";
-
+ public class BlockingEnvelopeMapMetrics {
+ private final String group;
private final MetricsRegistry metricsRegistry;
- private final ConcurrentHashMap<SystemStreamPartition, Counter> bufferedMessageCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> noMoreMessageGaugeMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
private final Counter pollCount;
- public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) {
+ public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
+ this.group = group;
this.metricsRegistry = metricsRegistry;
- this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
- this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount");
+ this.pollCount = metricsRegistry.newCounter(group, "poll-count");
}
public void initMetrics(SystemStreamPartition systemStreamPartition) {
- this.bufferedMessageCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BufferedMessageCount-" + systemStreamPartition));
- this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(GROUP, "NoMoreMessages-" + systemStreamPartition, false));
- this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "PollCount-" + systemStreamPartition));
- this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollCount-" + systemStreamPartition));
- this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollTimeoutCount-" + systemStreamPartition));
- }
+ this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(group, "no-more-messages-" + systemStreamPartition, false));
+ this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "poll-count-" + systemStreamPartition));
+ this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-count-" + systemStreamPartition));
+ this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
- public void incBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) {
- this.bufferedMessageCountMap.get(systemStreamPartition).inc(count);
- }
-
- public void decBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) {
- this.bufferedMessageCountMap.get(systemStreamPartition).dec(count);
+ metricsRegistry.<Integer> newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
}
public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
@@ -243,4 +228,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
this.pollCount.inc();
}
}
+
+ public class BufferGauge extends Gauge<Integer> {
+ private final SystemStreamPartition systemStreamPartition;
+
+ public BufferGauge(SystemStreamPartition systemStreamPartition, String name) {
+ super(name, 0);
+
+ this.systemStreamPartition = systemStreamPartition;
+ }
+
+ @Override
+ public Integer getValue() {
+ Queue<IncomingMessageEnvelope> envelopes = bufferedMessages.get(systemStreamPartition);
+
+ if (envelopes == null) {
+ return 0;
+ }
+
+ return envelopes.size();
+ }
+ }
}