You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/10 23:50:59 UTC

samza git commit: SAMZA-1283: Expose the buffered-message-size metric

Repository: samza
Updated Branches:
  refs/heads/master 36b2f23bb -> 71930e22f


SAMZA-1283: Expose the buffered-message-size metric

Regardless of whether we enable size limit for the consumer buffer, this metric helps to see what's the buffer size and make configuring size limit easier.

Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>

Reviewers: Jagadish Venkatraman <vj...@gmail.com>

Closes #184 from xinyuiscool/SAMZA-1283


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/71930e22
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/71930e22
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/71930e22

Branch: refs/heads/master
Commit: 71930e22fbb48415d8d9a11ef7cc3478dc004bf8
Parents: 36b2f23
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Wed May 10 16:50:49 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed May 10 16:50:49 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/util/BlockingEnvelopeMap.java  | 20 +++++---------------
 .../samza/util/TestBlockingEnvelopeMap.java     |  2 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  3 +--
 3 files changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 8238d2e..0205a44 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -68,7 +68,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;  // size in bytes per SystemStreamPartition
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
   private final Clock clock;
-  protected final boolean fetchLimitByBytesEnabled;
 
   public BlockingEnvelopeMap() {
     this(new NoOpMetricsRegistry());
@@ -83,17 +82,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
-    this(metricsRegistry, clock, null, false);
+    this(metricsRegistry, clock, null);
   }
 
-  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
     metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
     this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
     this.clock = clock;
-    this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
-    // Created when size is disabled for code simplification, and as the overhead is negligible.
     this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
   }
 
@@ -103,7 +100,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
-    // Created when size is disabled for code simplification, and the overhead is negligible.
     bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
   }
 
@@ -155,9 +151,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
       if (outgoingList.size() > 0) {
         messagesToReturn.put(systemStreamPartition, outgoingList);
-        if (fetchLimitByBytesEnabled) {
-          subtractSizeOnQDrain(systemStreamPartition, outgoingList);
-        }
+        subtractSizeOnQDrain(systemStreamPartition, outgoingList);
       }
     }
 
@@ -183,9 +177,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
    */
   protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
     bufferedMessages.get(systemStreamPartition).put(envelope);
-    if (fetchLimitByBytesEnabled) {
-      bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
-    }
+    bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
   }
 
   /**
@@ -262,9 +254,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
       this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
 
       metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
-      if (fetchLimitByBytesEnabled) {
-        metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
-      }
+      metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {

http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index afdae16..f5394c0 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -213,7 +213,7 @@ public class TestBlockingEnvelopeMap {
     }
 
     public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
-      super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
+      super(new NoOpMetricsRegistry(), CLOCK, null);
       injectedQueue = new MockQueue();
     }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index f25bb68..aa13fd8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -127,8 +127,7 @@ private[kafka] class KafkaSystemConsumer(
     new Clock {
       def currentTimeMillis = clock()
     },
-    classOf[KafkaSystemConsumerMetrics].getName,
-    fetchLimitByBytesEnabled) with Toss with Logging {
+    classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()