You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:51 UTC
[35/50] [abbrv] samza git commit: SAMZA-775: add consumer size-based
fetch threshold
SAMZA-775: add consumer size-based fetch threshold
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72a558ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72a558ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72a558ce
Branch: refs/heads/samza-sql
Commit: 72a558cebe50d98b5ef6a566b6cd8607b7032d96
Parents: acd340e
Author: Monal Daxini <md...@gmail.com>
Authored: Tue Nov 24 15:50:35 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 24 15:50:35 2015 -0800
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 25 +++++++
.../samza/system/IncomingMessageEnvelope.java | 19 +++++
.../apache/samza/util/BlockingEnvelopeMap.java | 66 +++++++++++++++--
.../samza/util/TestBlockingEnvelopeMap.java | 46 ++++++++++--
.../org/apache/samza/config/KafkaConfig.scala | 12 ++++
.../system/kafka/KafkaSystemConsumer.scala | 76 +++++++++++++++++---
.../samza/system/kafka/KafkaSystemFactory.scala | 3 +
.../apache/samza/config/TestKafkaConfig.scala | 10 +++
.../system/kafka/TestKafkaSystemConsumer.scala | 70 ++++++++++++++++++
9 files changed, 308 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 09f2b6f..96fdcc0 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -882,6 +882,31 @@
</tr>
<tr>
+ <td class="property" id="systems-samza-fetch-threshold-bytes">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold.bytes</td>
+ <td class="default">-1</td>
+ <td class="description">
+ When consuming streams from Kafka, a Samza container maintains an in-memory buffer
+ for incoming messages in order to increase throughput (the stream task can continue
+ processing buffered messages while new messages are fetched from Kafka). This
+ parameter determines the total size of messages we aim to buffer across all stream
+ partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered
+ prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this.
+ This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes
+ limit + size of max message in the partition for a given stream. If the value of this property is > 0
+ then this takes precedence over systems.<span class="system">system-name</span>.samza.fetch.threshold.<br>
+ For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered,
+ then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage
+ can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops
+ below 1000, a fetch request will be executed to get more data for it.
+
+ Increasing this parameter will decrease the latency between when a queue is drained of messages and when new
+ messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory.
+
+ The default value is -1, which means this is not used.
+ </td>
+ </tr>
+
+ <tr>
<td class="property" id="task-checkpoint-system">task.checkpoint.system</td>
<td class="default"></td>
<td class="description">
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 4b14312..cc860cf 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -28,6 +28,7 @@ public class IncomingMessageEnvelope {
private final String offset;
private final Object key;
private final Object message;
+ private final int size;
/**
* Constructs a new IncomingMessageEnvelope from specified components.
@@ -38,10 +39,24 @@ public class IncomingMessageEnvelope {
* @param message A deserialized message received from the partition offset.
*/
public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message) {
+ this(systemStreamPartition, offset, key, message, 0);
+ }
+
+ /**
+ * Constructs a new IncomingMessageEnvelope from specified components.
+ * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
+ * from which the stream came, and the partition of the stream from which the message was received.
+ * @param offset The offset in the partition that the message was received from.
+ * @param key A deserialized key received from the partition offset.
+ * @param message A deserialized message received from the partition offset.
+ * @param size size of the message and key in bytes.
+ */
+ public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) {
this.systemStreamPartition = systemStreamPartition;
this.offset = offset;
this.key = key;
this.message = message;
+ this.size = size;
}
public SystemStreamPartition getSystemStreamPartition() {
@@ -60,6 +75,10 @@ public class IncomingMessageEnvelope {
return message;
}
+ public int getSize() {
+ return size;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 7dd99fb..8238d2e 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -37,6 +37,8 @@ import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* <p>
* BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
@@ -63,17 +65,15 @@ import org.apache.samza.system.SystemStreamPartition;
public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final BlockingEnvelopeMapMetrics metrics;
private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
+ private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize; // size in bytes per SystemStreamPartition
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
private final Clock clock;
+ protected final boolean fetchLimitByBytesEnabled;
public BlockingEnvelopeMap() {
this(new NoOpMetricsRegistry());
}
- public BlockingEnvelopeMap(Clock clock) {
- this(new NoOpMetricsRegistry(), clock);
- }
-
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
this(metricsRegistry, new Clock() {
public long currentTimeMillis() {
@@ -83,15 +83,18 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
- this(metricsRegistry, clock, null);
+ this(metricsRegistry, clock, null, false);
}
- public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
+ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
this.clock = clock;
+ this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
+ // Created when size is disabled for code simplification, and as the overhead is negligible.
+ this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
}
/**
@@ -100,6 +103,8 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
public void register(SystemStreamPartition systemStreamPartition, String offset) {
metrics.initMetrics(systemStreamPartition);
bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
+ // Created when size is disabled for code simplification, and the overhead is negligible.
+ bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
}
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
@@ -150,12 +155,24 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
if (outgoingList.size() > 0) {
messagesToReturn.put(systemStreamPartition, outgoingList);
+ if (fetchLimitByBytesEnabled) {
+ subtractSizeOnQDrain(systemStreamPartition, outgoingList);
+ }
}
}
return messagesToReturn;
}
+ private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) {
+ long outgoingListBytes = 0;
+ for (IncomingMessageEnvelope envelope : outgoingList) {
+ outgoingListBytes += envelope.getSize();
+ }
+ // subtract the size of the messages dequeued.
+ bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1 * outgoingListBytes);
+ }
+
/**
* Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
* queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
@@ -166,6 +183,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
*/
protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
bufferedMessages.get(systemStreamPartition).put(envelope);
+ if (fetchLimitByBytesEnabled) {
+ bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
+ }
}
/**
@@ -198,6 +218,16 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
}
+ public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
+ AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
+
+ if (sizeInBytes == null) {
+ throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
+ } else {
+ return sizeInBytes.get();
+ }
+ }
+
protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
return noMoreMessage.put(systemStreamPartition, isAtHead);
@@ -232,6 +262,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
+ if (fetchLimitByBytesEnabled) {
+ metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
+ }
}
public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
@@ -271,4 +304,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
return envelopes.size();
}
}
+
+ public class BufferSizeGauge extends Gauge<Long> {
+ private final SystemStreamPartition systemStreamPartition;
+
+ public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) {
+ super(name, 0L);
+
+ this.systemStreamPartition = systemStreamPartition;
+ }
+
+ @Override
+ public Long getValue() {
+ AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
+
+ if (sizeInBytes == null) {
+ return 0L;
+ }
+
+ return sizeInBytes.get();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index d1a0a82..afdae16 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -40,7 +40,13 @@ import org.junit.Test;
public class TestBlockingEnvelopeMap {
private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null);
+ private static final IncomingMessageEnvelope ENVELOPE_WITH_SIZE = new IncomingMessageEnvelope(SSP, null, null, null, 100);
private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
+ private static final Clock CLOCK = new Clock() {
+ public long currentTimeMillis() {
+ return System.currentTimeMillis();
+ }
+ };
static {
FETCH.add(SSP);
@@ -78,6 +84,35 @@ public class TestBlockingEnvelopeMap {
envelopes = map.poll(FETCH, 0);
assertEquals(1, envelopes.size());
assertEquals(2, envelopes.get(SSP).size());
+
+ // Size info.
+ assertEquals(0, map.getMessagesSizeInQueue(SSP));
+ }
+
+ @Test
+ public void testNoSizeComputation() throws InterruptedException {
+ BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
+ map.register(SSP, "0");
+ map.put(SSP, ENVELOPE);
+ map.put(SSP, ENVELOPE);
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
+
+ // Size info.
+ assertEquals(0, map.getMessagesSizeInQueue(SSP));
+ }
+
+ @Test
+ public void testSizeComputation() throws InterruptedException {
+ BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(true);
+ map.register(SSP, "0");
+ map.put(SSP, ENVELOPE_WITH_SIZE);
+ map.put(SSP, ENVELOPE_WITH_SIZE);
+
+ // Size info.
+ assertEquals(200, map.getMessagesSizeInQueue(SSP));
+
+ Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
+ assertEquals(0, map.getMessagesSizeInQueue(SSP));
}
@Test
@@ -177,12 +212,13 @@ public class TestBlockingEnvelopeMap {
this(null);
}
+ public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
+ super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
+ injectedQueue = new MockQueue();
+ }
+
public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue) {
- this(injectedQueue, new Clock() {
- public long currentTimeMillis() {
- return System.currentTimeMillis();
- }
- });
+ this(injectedQueue, CLOCK);
}
public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue, Clock clock) {
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index a65e8e8..1822511 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -60,6 +60,15 @@ object KafkaConfig {
val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
+ /**
+ * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
+ * The bytes for a single system/stream/partition are computed based on this.
+ * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
+ * the bytes limit + size of max message in the partition for a given stream.
+ * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
+ */
+ val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
@@ -70,6 +79,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
// custom consumer config
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+ def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+ def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
+
/**
* Returns a map of topic -> fetch.message.max.bytes value for all streams that
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index c948d64..b373753 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
import kafka.common.TopicAndPartition
import org.apache.samza.util.Logging
+import kafka.message.Message
import kafka.message.MessageAndOffset
import org.apache.samza.Partition
import kafka.utils.Utils
@@ -32,10 +33,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.SamzaException
import org.apache.samza.util.TopicMetadataStore
-import org.apache.samza.util.ExponentialSleepStrategy
import kafka.api.TopicMetadata
import org.apache.samza.util.ExponentialSleepStrategy
import java.util.concurrent.ConcurrentHashMap
@@ -43,6 +41,13 @@ import scala.collection.JavaConversions._
import org.apache.samza.system.SystemAdmin
object KafkaSystemConsumer {
+
+ // Approximate additional shallow heap overhead per message in addition to the raw bytes
+ // received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
+ // As this overhead is a moving target, and not very large
+ // compared to the message size its being ignore in the computation for now.
+ val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4;
+
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
val topic = systemStreamPartition.getStream
val partitionId = systemStreamPartition.getPartition.getPartitionId
@@ -80,22 +85,65 @@ private[kafka] class KafkaSystemConsumer(
* to an increase in memory usage since more messages will be held in memory.
*/
fetchThreshold: Int = 50000,
+ /**
+ * Defines a low water mark for how many bytes we buffer before we start
+ * executing fetch requests against brokers to get more messages. This
+ * value is divided by 2 because the messages are buffered twice, once in
+ * KafkaConsumer and then in SystemConsumers. This value
+ * is divided equally among all registered SystemStreamPartitions.
+ * However this is a soft limit per partition, as the
+ * bytes are cached at the message boundaries, and the actual usage can be
+ * 1000 bytes + size of max message in the partition for a given stream.
+ * The bytes if the size of the bytebuffer in Message. Hence, the
+ * Object overhead is not taken into consideration. In this codebase
+ * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
+ * which is not considerable.
+ *
+ * For example,
+ * if fetchThresholdBytes is set to 100000 bytes, and there are 50
+ * SystemStreamPartitions registered, then the per-partition threshold is
+ * (100000 / 2) / 50 = 1000 bytes.
+ * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
+ * As soon as a SystemStreamPartition's buffered messages bytes drops
+ * below 1000, a fetch request will be executed to get more data for it.
+ *
+ * Increasing this parameter will decrease the latency between when a queue
+ * is drained of messages and when new messages are enqueued, but also leads
+ * to an increase in memory usage since more messages will be held in memory.
+ *
+ * The default value is -1, which means this is not used. When the value
+ * is > 0, then the fetchThreshold which is count based is ignored.
+ */
+ fetchThresholdBytes: Long = -1,
+ /**
+ * if(fetchThresholdBytes > 0) true else false
+ */
+ fetchLimitByBytesEnabled: Boolean = false,
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
- clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock {
- def currentTimeMillis = clock()
-}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
+ clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
+ metrics.registry,
+ new Clock {
+ def currentTimeMillis = clock()
+ },
+ classOf[KafkaSystemConsumerMetrics].getName,
+ fetchLimitByBytesEnabled) with Toss with Logging {
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
var perPartitionFetchThreshold = fetchThreshold
+ var perPartitionFetchThresholdBytes = 0L
def start() {
if (topicPartitionsAndOffsets.size > 0) {
perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
+ // messages get double buffered, hence divide by 2
+ if(fetchLimitByBytesEnabled) {
+ perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
+ }
}
refreshBrokers
@@ -202,7 +250,15 @@ private[kafka] class KafkaSystemConsumer(
}
def needsMoreMessages(tp: TopicAndPartition) = {
- getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold
+ if(fetchLimitByBytesEnabled) {
+ getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
+ } else {
+ getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
+ }
+ }
+
+ def getMessageSize(message: Message): Integer = {
+ message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
}
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
@@ -222,7 +278,11 @@ private[kafka] class KafkaSystemConsumer(
null
}
- put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+ if(fetchLimitByBytesEnabled ) {
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)))
+ } else {
+ put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+ }
setIsAtHead(systemStreamPartition, isAtHead)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a60cda2..b574176 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -62,6 +62,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
val autoOffsetResetDefault = consumerConfig.autoOffsetReset
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
+ val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
@@ -77,6 +78,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
consumerMinSize = consumerMinSize,
consumerMaxWait = consumerMaxWait,
fetchThreshold = fetchThreshold,
+ fetchThresholdBytes = fetchThresholdBytes,
+ fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName),
offsetGetter = offsetGetter)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 85badf9..c4a83f6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -108,6 +108,16 @@ class TestKafkaConfig {
val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME)
// topic fetch size
assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
+
+ // default samza.fetch.threshold.bytes
+ val mapConfig3 = new MapConfig(props.toMap[String, String])
+ val kafkaConfig3 = new KafkaConfig(mapConfig3)
+ assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty)
+
+ props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536")
+ val mapConfig4 = new MapConfig(props.toMap[String, String])
+ val kafkaConfig4 = new KafkaConfig(mapConfig4)
+ assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get)
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 23fa939..ece0359 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -23,6 +23,10 @@ import kafka.api.TopicMetadata
import kafka.api.PartitionMetadata
import kafka.cluster.Broker
import kafka.common.TopicAndPartition
+import kafka.message.Message
+import kafka.message.MessageAndOffset
+
+import org.apache.samza.system.IncomingMessageEnvelope
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import org.apache.samza.util.TopicMetadataStore
@@ -34,6 +38,9 @@ import org.mockito.Matchers._
class TestKafkaSystemConsumer {
val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
+ private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
+ private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
+ private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
@Test
def testFetchThresholdShouldDivideEvenlyAmongPartitions {
@@ -114,6 +121,69 @@ class TestKafkaSystemConsumer {
assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
}
+
+ @Test
+ def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ assertEquals(5000, consumer.perPartitionFetchThreshold)
+ assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
+ }
+
+ @Test
+ def testFetchThresholdBytes {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ val msg = Array[Byte](5, 112, 9, 126)
+ val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
+ // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead
+ consumer.sink.addMessage(new TopicAndPartition("test-stream", 0), msgAndOffset, 887354)
+
+ assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+ }
+
+
+ @Test
+ def testFetchThresholdBytesDisabled {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+ fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
+ override def refreshBrokers {
+ }
+ }
+
+ for (i <- 0 until 10) {
+ consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+ }
+
+ consumer.start
+
+ assertEquals(5000, consumer.perPartitionFetchThreshold)
+ assertEquals(0, consumer.perPartitionFetchThresholdBytes)
+ assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+ }
}
class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {