You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/05/10 23:50:59 UTC
samza git commit: SAMZA-1283: Expose the buffered-message-size metric
Repository: samza
Updated Branches:
refs/heads/master 36b2f23bb -> 71930e22f
SAMZA-1283: Expose the buffered-message-size metric
Regardless of whether we enable size limit for the consumer buffer, this metric helps to see what's the buffer size and make configuring size limit easier.
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Reviewers: Jagadish Venkatraman <vj...@gmail.com>
Closes #184 from xinyuiscool/SAMZA-1283
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/71930e22
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/71930e22
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/71930e22
Branch: refs/heads/master
Commit: 71930e22fbb48415d8d9a11ef7cc3478dc004bf8
Parents: 36b2f23
Author: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Authored: Wed May 10 16:50:49 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed May 10 16:50:49 2017 -0700
----------------------------------------------------------------------
.../apache/samza/util/BlockingEnvelopeMap.java | 20 +++++---------------
.../samza/util/TestBlockingEnvelopeMap.java | 2 +-
.../system/kafka/KafkaSystemConsumer.scala | 3 +--
3 files changed, 7 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 8238d2e..0205a44 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -68,7 +68,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
- protected final boolean fetchLimitByBytesEnabled;
public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
@@ -83,17 +82,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
- this(metricsRegistry, clock, null, false);
+ this(metricsRegistry, clock, null);
}
- public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
+ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
- this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
- // Created when size is disabled for code simplification, and as the overhead is negligible.
this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
}
@@ -103,7 +100,6 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
public void register(SystemStreamPartition systemStreamPartition, String offset) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
- // Created when size is disabled for code simplification, and the overhead is negligible.
bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
}
@@ -155,9 +151,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
- if (fetchLimitByBytesEnabled) {
- subtractSizeOnQDrain(systemStreamPartition, outgoingList);
- }
+ subtractSizeOnQDrain(systemStreamPartition, outgoingList);
}
}
@@ -183,9 +177,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
*/
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
bufferedMessages.get(systemStreamPartition).put(envelope);
- if (fetchLimitByBytesEnabled) {
- bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
- }
+ bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
}
/**
@@ -262,9 +254,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
- if (fetchLimitByBytesEnabled) {
- metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
- }
+ metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
}
public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index afdae16..f5394c0 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -213,7 +213,7 @@ public class TestBlockingEnvelopeMap {
}
public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
- super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
+ super(new NoOpMetricsRegistry(), CLOCK, null);
injectedQueue = new MockQueue();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/71930e22/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index f25bb68..aa13fd8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -127,8 +127,7 @@ private[kafka] class KafkaSystemConsumer(
new Clock {
def currentTimeMillis = clock()
},
- classOf[KafkaSystemConsumerMetrics].getName,
- fetchLimitByBytesEnabled) with Toss with Logging {
+ classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()