You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/10/07 21:49:22 UTC

git commit: SAMZA-28; fixing bug in blocking envelope map metrics.

Updated Branches:
  refs/heads/master 79a4376d0 -> ef95569f8


SAMZA-28; fixing bug in blocking envelope map metrics.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/ef95569f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/ef95569f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/ef95569f

Branch: refs/heads/master
Commit: ef95569f8cec240cb0214012e5f6b15b8f4d4f6f
Parents: 79a4376
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Mon Oct 7 12:49:12 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Oct 7 12:49:12 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/metrics/Gauge.java    |  4 ++
 .../apache/samza/util/BlockingEnvelopeMap.java  | 60 +++++++++++---------
 2 files changed, 37 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
index 14db2d3..3335c15 100644
--- a/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
+++ b/samza-api/src/main/java/org/apache/samza/metrics/Gauge.java
@@ -30,6 +30,10 @@ public class Gauge<T> implements Metric {
     this.ref = new AtomicReference<T>(value);
   }
 
+  public boolean compareAndSet(T expected, T n) {
+    return ref.compareAndSet(expected, n);
+  }
+
   public T set(T n) {
     return ref.getAndSet(n);
   }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/ef95569f/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index c1dd279..d4db126 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -22,6 +22,7 @@ package org.apache.samza.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -75,7 +76,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
-    this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry);
+    this.metrics = new BlockingEnvelopeMapMetrics(this.getClass().getName(), metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
     this.clock = clock;
@@ -116,8 +117,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
         }
       }
 
-      metrics.decBufferedMessageCount(systemStreamPartition, systemStreamPartitionMessages.size());
-
       // Now block if blocking is allowed and we have no messages.
       if (systemStreamPartitionMessages.size() == 0) {
         // How long we can legally block (if timeout > 0)
@@ -130,7 +129,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
             if (envelope != null) {
               systemStreamPartitionMessages.add(envelope);
-              metrics.decBufferedMessageCount(systemStreamPartition, 1);
             }
           }
         } else if (timeout > 0 && timeRemaining > 0) {
@@ -139,7 +137,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
           if (envelope != null) {
             systemStreamPartitionMessages.add(envelope);
-            metrics.decBufferedMessageCount(systemStreamPartition, 1);
           }
         }
       }
@@ -152,7 +149,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
   protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
     bufferedMessages.get(systemStreamPartition).add(envelope);
-    metrics.incBufferedMessageCount(systemStreamPartition, 1);
   }
 
   protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) {
@@ -161,8 +157,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     for (IncomingMessageEnvelope envelope : envelopes) {
       queue.add(envelope);
     }
-
-    metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size());
   }
 
   public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
@@ -186,41 +180,32 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
   }
 
-  public static final class BlockingEnvelopeMapMetrics {
-    private static final String GROUP = "samza.consumers";
-
+  public class BlockingEnvelopeMapMetrics {
+    private final String group;
     private final MetricsRegistry metricsRegistry;
-    private final ConcurrentHashMap<SystemStreamPartition, Counter> bufferedMessageCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> noMoreMessageGaugeMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
     private final Counter pollCount;
 
-    public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) {
+    public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
+      this.group = group;
       this.metricsRegistry = metricsRegistry;
-      this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
       this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
-      this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount");
+      this.pollCount = metricsRegistry.newCounter(group, "poll-count");
     }
 
     public void initMetrics(SystemStreamPartition systemStreamPartition) {
-      this.bufferedMessageCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BufferedMessageCount-" + systemStreamPartition));
-      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(GROUP, "NoMoreMessages-" + systemStreamPartition, false));
-      this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "PollCount-" + systemStreamPartition));
-      this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollCount-" + systemStreamPartition));
-      this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(GROUP, "BlockingPollTimeoutCount-" + systemStreamPartition));
-    }
+      this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, metricsRegistry.<Boolean> newGauge(group, "no-more-messages-" + systemStreamPartition, false));
+      this.pollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "poll-count-" + systemStreamPartition));
+      this.blockingPollCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-count-" + systemStreamPartition));
+      this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
 
-    public void incBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) {
-      this.bufferedMessageCountMap.get(systemStreamPartition).inc(count);
-    }
-
-    public void decBufferedMessageCount(SystemStreamPartition systemStreamPartition, int count) {
-      this.bufferedMessageCountMap.get(systemStreamPartition).dec(count);
+      metricsRegistry.<Integer> newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
@@ -243,4 +228,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
       this.pollCount.inc();
     }
   }
+
+  public class BufferGauge extends Gauge<Integer> {
+    private final SystemStreamPartition systemStreamPartition;
+
+    public BufferGauge(SystemStreamPartition systemStreamPartition, String name) {
+      super(name, 0);
+
+      this.systemStreamPartition = systemStreamPartition;
+    }
+
+    @Override
+    public Integer getValue() {
+      Queue<IncomingMessageEnvelope> envelopes = bufferedMessages.get(systemStreamPartition);
+
+      if (envelopes == null) {
+        return 0;
+      }
+
+      return envelopes.size();
+    }
+  }
 }