You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:51 UTC

[35/50] [abbrv] samza git commit: SAMZA-775: add consumer size-based fetch threshold

SAMZA-775: add consumer size-based fetch threshold


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/72a558ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/72a558ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/72a558ce

Branch: refs/heads/samza-sql
Commit: 72a558cebe50d98b5ef6a566b6cd8607b7032d96
Parents: acd340e
Author: Monal Daxini <md...@gmail.com>
Authored: Tue Nov 24 15:50:35 2015 -0800
Committer: Yi Pan (Data Infrastructure) <ni...@gmail.com>
Committed: Tue Nov 24 15:50:35 2015 -0800

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 25 +++++++
 .../samza/system/IncomingMessageEnvelope.java   | 19 +++++
 .../apache/samza/util/BlockingEnvelopeMap.java  | 66 +++++++++++++++--
 .../samza/util/TestBlockingEnvelopeMap.java     | 46 ++++++++++--
 .../org/apache/samza/config/KafkaConfig.scala   | 12 ++++
 .../system/kafka/KafkaSystemConsumer.scala      | 76 +++++++++++++++++---
 .../samza/system/kafka/KafkaSystemFactory.scala |  3 +
 .../apache/samza/config/TestKafkaConfig.scala   | 10 +++
 .../system/kafka/TestKafkaSystemConsumer.scala  | 70 ++++++++++++++++++
 9 files changed, 308 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index 09f2b6f..96fdcc0 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -882,6 +882,31 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="systems-samza-fetch-threshold-bytes">systems.<span class="system">system-name</span>.<br>samza.fetch.threshold.bytes</td>
+                    <td class="default">-1</td>
+                    <td class="description">
+                        When consuming streams from Kafka, a Samza container maintains an in-memory buffer
+                        for incoming messages in order to increase throughput (the stream task can continue
+                        processing buffered messages while new messages are fetched from Kafka). This
+                        parameter determines the total size of messages we aim to buffer across all stream
+                        partitions consumed by a container based on bytes. Defines how many bytes to use for the buffered
+                        prefetch messages for job as a whole. The bytes for a single system/stream/partition are computed based on this.
+                        This fetches the entire messages, hence this bytes limit is a soft one, and the actual usage can be the bytes
+                        limit + size of max message in the partition for a given stream. If the value of this property is > 0
+                        then this takes precedence over systems.<span class="system">system-name</span>.samza.fetch.threshold.<br>
+                        For example, if fetchThresholdBytes is set to 100000 bytes, and there are 50 SystemStreamPartitions registered,
+                        then the per-partition threshold is (100000 / 2) / 50 = 1000 bytes. As this is a soft limit, the actual usage
+                        can be 1000 bytes + size of max message. As soon as a SystemStreamPartition's buffered messages bytes drops
+                        below 1000, a fetch request will be executed to get more data for it.
+
+                        Increasing this parameter will decrease the latency between when a queue is drained of messages and when new
+                        messages are enqueued, but also leads to an increase in memory usage since more messages will be held in memory.
+
+                        The default value is -1, which means this is not used.
+                    </td>
+                </tr>
+
+                <tr>
                     <td class="property" id="task-checkpoint-system">task.checkpoint.system</td>
                     <td class="default"></td>
                     <td class="description">

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
index 4b14312..cc860cf 100644
--- a/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
+++ b/samza-api/src/main/java/org/apache/samza/system/IncomingMessageEnvelope.java
@@ -28,6 +28,7 @@ public class IncomingMessageEnvelope {
   private final String offset;
   private final Object key;
   private final Object message;
+  private final int size;
 
   /**
    * Constructs a new IncomingMessageEnvelope from specified components.
@@ -38,10 +39,24 @@ public class IncomingMessageEnvelope {
    * @param message A deserialized message received from the partition offset.
    */
   public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message) {
+    this(systemStreamPartition, offset, key, message, 0);
+  }
+
+  /**
+   * Constructs a new IncomingMessageEnvelope from specified components.
+   * @param systemStreamPartition The aggregate object representing the incoming stream name, the name of the cluster
+   * from which the stream came, and the partition of the stream from which the message was received.
+   * @param offset The offset in the partition that the message was received from.
+   * @param key A deserialized key received from the partition offset.
+   * @param message A deserialized message received from the partition offset.
+   * @param size size of the message and key in bytes.
+   */
+  public IncomingMessageEnvelope(SystemStreamPartition systemStreamPartition, String offset, Object key, Object message, int size) {
     this.systemStreamPartition = systemStreamPartition;
     this.offset = offset;
     this.key = key;
     this.message = message;
+    this.size = size;
   }
 
   public SystemStreamPartition getSystemStreamPartition() {
@@ -60,6 +75,10 @@ public class IncomingMessageEnvelope {
     return message;
   }
 
+  public int getSize() {
+    return size;
+  }
+
   @Override
   public int hashCode() {
     final int prime = 31;

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 7dd99fb..8238d2e 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -37,6 +37,8 @@ import org.apache.samza.system.IncomingMessageEnvelope;
 import org.apache.samza.system.SystemConsumer;
 import org.apache.samza.system.SystemStreamPartition;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 /**
  * <p>
  * BlockingEnvelopeMap is a helper class for SystemConsumer implementations.
@@ -63,17 +65,15 @@ import org.apache.samza.system.SystemStreamPartition;
 public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final BlockingEnvelopeMapMetrics metrics;
   private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
+  private final ConcurrentHashMap<SystemStreamPartition, AtomicLong> bufferedMessagesSize;  // size in bytes per SystemStreamPartition
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
   private final Clock clock;
+  protected final boolean fetchLimitByBytesEnabled;
 
   public BlockingEnvelopeMap() {
     this(new NoOpMetricsRegistry());
   }
 
-  public BlockingEnvelopeMap(Clock clock) {
-    this(new NoOpMetricsRegistry(), clock);
-  }
-
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
     this(metricsRegistry, new Clock() {
       public long currentTimeMillis() {
@@ -83,15 +83,18 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
-    this(metricsRegistry, clock, null);
+    this(metricsRegistry, clock, null, false);
   }
 
-  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName, boolean fetchLimitByBytesEnabled) {
     metricsGroupName = (metricsGroupName == null) ? this.getClass().getName() : metricsGroupName;
     this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
     this.clock = clock;
+    this.fetchLimitByBytesEnabled = fetchLimitByBytesEnabled;
+    // Created when size is disabled for code simplification, and as the overhead is negligible.
+    this.bufferedMessagesSize = new ConcurrentHashMap<SystemStreamPartition, AtomicLong>();
   }
 
   /**
@@ -100,6 +103,8 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   public void register(SystemStreamPartition systemStreamPartition, String offset) {
     metrics.initMetrics(systemStreamPartition);
     bufferedMessages.putIfAbsent(systemStreamPartition, newBlockingQueue());
+    // Created when size is disabled for code simplification, and the overhead is negligible.
+    bufferedMessagesSize.putIfAbsent(systemStreamPartition, new AtomicLong(0));
   }
 
   protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
@@ -150,12 +155,24 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
 
       if (outgoingList.size() > 0) {
         messagesToReturn.put(systemStreamPartition, outgoingList);
+        if (fetchLimitByBytesEnabled) {
+          subtractSizeOnQDrain(systemStreamPartition, outgoingList);
+        }
       }
     }
 
     return messagesToReturn;
   }
 
+  private void subtractSizeOnQDrain(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> outgoingList) {
+    long outgoingListBytes = 0;
+    for (IncomingMessageEnvelope envelope : outgoingList) {
+      outgoingListBytes += envelope.getSize();
+    }
+    // subtract the size of the messages dequeued.
+    bufferedMessagesSize.get(systemStreamPartition).addAndGet(-1 * outgoingListBytes);
+  }
+
   /**
    * Place a new {@link org.apache.samza.system.IncomingMessageEnvelope} on the
    * queue for the specified {@link org.apache.samza.system.SystemStreamPartition}.
@@ -166,6 +183,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
    */
   protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
     bufferedMessages.get(systemStreamPartition).put(envelope);
+    if (fetchLimitByBytesEnabled) {
+      bufferedMessagesSize.get(systemStreamPartition).addAndGet(envelope.getSize());
+    }
   }
 
   /**
@@ -198,6 +218,16 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     }
   }
 
+  public long getMessagesSizeInQueue(SystemStreamPartition systemStreamPartition) {
+    AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
+
+    if (sizeInBytes == null) {
+      throw new NullPointerException("Attempting to get size for " + systemStreamPartition + ", but the system/stream/partition was never registered. or fetch");
+    } else {
+      return sizeInBytes.get();
+    }
+  }
+
   protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
     metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
     return noMoreMessage.put(systemStreamPartition, isAtHead);
@@ -232,6 +262,9 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
       this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, metricsRegistry.newCounter(group, "blocking-poll-timeout-count-" + systemStreamPartition));
 
       metricsRegistry.<Integer>newGauge(group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
+      if (fetchLimitByBytesEnabled) {
+        metricsRegistry.<Long>newGauge(group, new BufferSizeGauge(systemStreamPartition, "buffered-message-size-" + systemStreamPartition));
+      }
     }
 
     public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
@@ -271,4 +304,25 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
       return envelopes.size();
     }
   }
+
+  public class BufferSizeGauge extends Gauge<Long> {
+    private final SystemStreamPartition systemStreamPartition;
+
+    public BufferSizeGauge(SystemStreamPartition systemStreamPartition, String name) {
+      super(name, 0L);
+
+      this.systemStreamPartition = systemStreamPartition;
+    }
+
+    @Override
+    public Long getValue() {
+      AtomicLong sizeInBytes = bufferedMessagesSize.get(systemStreamPartition);
+
+      if (sizeInBytes == null) {
+        return 0L;
+      }
+
+      return sizeInBytes.get();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index d1a0a82..afdae16 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -40,7 +40,13 @@ import org.junit.Test;
 public class TestBlockingEnvelopeMap {
   private static final SystemStreamPartition SSP = new SystemStreamPartition("test", "test", new Partition(0));
   private static final IncomingMessageEnvelope ENVELOPE = new IncomingMessageEnvelope(SSP, null, null, null);
+  private static final IncomingMessageEnvelope ENVELOPE_WITH_SIZE = new IncomingMessageEnvelope(SSP, null, null, null, 100);
   private static final Set<SystemStreamPartition> FETCH = new HashSet<SystemStreamPartition>();
+  private static final Clock CLOCK = new Clock() {
+    public long currentTimeMillis() {
+      return System.currentTimeMillis();
+    }
+  };
 
   static {
     FETCH.add(SSP);
@@ -78,6 +84,35 @@ public class TestBlockingEnvelopeMap {
     envelopes = map.poll(FETCH, 0);
     assertEquals(1, envelopes.size());
     assertEquals(2, envelopes.get(SSP).size());
+
+    // Size info.
+    assertEquals(0, map.getMessagesSizeInQueue(SSP));
+  }
+
+  @Test
+  public void testNoSizeComputation() throws InterruptedException {
+    BlockingEnvelopeMap map = new MockBlockingEnvelopeMap();
+    map.register(SSP, "0");
+    map.put(SSP, ENVELOPE);
+    map.put(SSP, ENVELOPE);
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
+
+    // Size info.
+    assertEquals(0, map.getMessagesSizeInQueue(SSP));
+  }
+
+  @Test
+  public void testSizeComputation() throws InterruptedException {
+    BlockingEnvelopeMap map = new MockBlockingEnvelopeMap(true);
+    map.register(SSP, "0");
+    map.put(SSP, ENVELOPE_WITH_SIZE);
+    map.put(SSP, ENVELOPE_WITH_SIZE);
+
+    // Size info.
+    assertEquals(200, map.getMessagesSizeInQueue(SSP));
+
+    Map<SystemStreamPartition, List<IncomingMessageEnvelope>> envelopes = map.poll(FETCH, 0);
+    assertEquals(0, map.getMessagesSizeInQueue(SSP));
   }
 
   @Test
@@ -177,12 +212,13 @@ public class TestBlockingEnvelopeMap {
       this(null);
     }
 
+    public MockBlockingEnvelopeMap(boolean fetchLimitByBytesEnabled) {
+      super(new NoOpMetricsRegistry(), CLOCK, null, fetchLimitByBytesEnabled);
+      injectedQueue = new MockQueue();
+    }
+
     public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue) {
-      this(injectedQueue, new Clock() {
-        public long currentTimeMillis() {
-          return System.currentTimeMillis();
-        }
-      });
+      this(injectedQueue, CLOCK);
     }
 
     public MockBlockingEnvelopeMap(BlockingQueue<IncomingMessageEnvelope> injectedQueue, Clock clock) {

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index a65e8e8..1822511 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -60,6 +60,15 @@ object KafkaConfig {
 
   val DEFAULT_CHECKPOINT_SEGMENT_BYTES = 26214400
 
+  /**
+   * Defines how many bytes to use for the buffered prefetch messages for job as a whole.
+   * The bytes for a single system/stream/partition are computed based on this.
+   * This fetches wholes messages, hence this bytes limit is a soft one, and the actual usage can be
+   * the bytes limit + size of max message in the partition for a given stream.
+   * If the value of this property is > 0 then this takes precedence over CONSUMER_FETCH_THRESHOLD config.
+   */
+  val CONSUMER_FETCH_THRESHOLD_BYTES = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold.bytes"
+
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }
 
@@ -70,6 +79,9 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getCheckpointSegmentBytes() = getInt(KafkaConfig.CHECKPOINT_SEGMENT_BYTES, KafkaConfig.DEFAULT_CHECKPOINT_SEGMENT_BYTES)
   // custom consumer config
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
+  def getConsumerFetchThresholdBytes(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD_BYTES format name)
+  def isConsumerFetchThresholdBytesEnabled(name: String): Boolean = getConsumerFetchThresholdBytes(name).getOrElse("-1").toLong > 0
+
 
   /**
    * Returns a map of topic -> fetch.message.max.bytes value for all streams that

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index c948d64..b373753 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -21,6 +21,7 @@ package org.apache.samza.system.kafka
 
 import kafka.common.TopicAndPartition
 import org.apache.samza.util.Logging
+import kafka.message.Message
 import kafka.message.MessageAndOffset
 import org.apache.samza.Partition
 import kafka.utils.Utils
@@ -32,10 +33,7 @@ import org.apache.samza.util.BlockingEnvelopeMap
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
-import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.SamzaException
 import org.apache.samza.util.TopicMetadataStore
-import org.apache.samza.util.ExponentialSleepStrategy
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
@@ -43,6 +41,13 @@ import scala.collection.JavaConversions._
 import org.apache.samza.system.SystemAdmin
 
 object KafkaSystemConsumer {
+
+  // Approximate additional shallow heap overhead per message in addition to the raw bytes
+  // received from Kafka  4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
+  // As this overhead is a moving target, and not very large
+  // compared to the message size its being ignore in the computation for now.
+  val MESSAGE_SIZE_OVERHEAD =  4 + 64 + 4 + 4 + 4;
+
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
     val topic = systemStreamPartition.getStream
     val partitionId = systemStreamPartition.getPartition.getPartitionId
@@ -80,22 +85,65 @@ private[kafka] class KafkaSystemConsumer(
    * to an increase in memory usage since more messages will be held in memory.
    */
   fetchThreshold: Int = 50000,
+  /**
+   * Defines a low water mark for how many bytes we buffer before we start
+   * executing fetch requests against brokers to get more messages. This
+   * value is divided by 2 because the messages are buffered twice, once in
+   * KafkaConsumer and then in SystemConsumers. This value
+   * is divided equally among all registered SystemStreamPartitions.
+   * However this is a soft limit per partition, as the
+   * bytes are cached at the message boundaries, and the actual usage can be
+   * 1000 bytes + size of max message in the partition for a given stream.
+   * The bytes if the size of the bytebuffer in Message. Hence, the
+   * Object overhead is not taken into consideration. In this codebase
+   * it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
+   * which is not considerable.
+   *
+   * For example,
+   * if fetchThresholdBytes is set to 100000 bytes, and there are 50
+   * SystemStreamPartitions registered, then the per-partition threshold is
+   * (100000 / 2) / 50 = 1000 bytes.
+   * As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
+   * As soon as a SystemStreamPartition's buffered messages bytes drops
+   * below 1000, a fetch request will be executed to get more data for it.
+   *
+   * Increasing this parameter will decrease the latency between when a queue
+   * is drained of messages and when new messages are enqueued, but also leads
+   * to an increase in memory usage since more messages will be held in memory.
+   *
+   * The default value is -1, which means this is not used. When the value
+   * is > 0, then the fetchThreshold which is count based is ignored.
+   */
+  fetchThresholdBytes: Long = -1,
+  /**
+   * if(fetchThresholdBytes > 0) true else false
+   */
+  fetchLimitByBytesEnabled: Boolean = false,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock {
-  def currentTimeMillis = clock()
-}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
+    metrics.registry,
+    new Clock {
+      def currentTimeMillis = clock()
+    },
+    classOf[KafkaSystemConsumerMetrics].getName,
+    fetchLimitByBytesEnabled) with Toss with Logging {
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
   val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
   var perPartitionFetchThreshold = fetchThreshold
+  var perPartitionFetchThresholdBytes = 0L
 
   def start() {
     if (topicPartitionsAndOffsets.size > 0) {
       perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
+      // messages get double buffered, hence divide by 2
+      if(fetchLimitByBytesEnabled) {
+        perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
+      }
     }
 
     refreshBrokers
@@ -202,7 +250,15 @@ private[kafka] class KafkaSystemConsumer(
     }
 
     def needsMoreMessages(tp: TopicAndPartition) = {
-      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold
+      if(fetchLimitByBytesEnabled) {
+        getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
+      } else {
+        getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
+      }
+    }
+
+    def getMessageSize(message: Message): Integer = {
+      message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
     }
 
     def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
@@ -222,7 +278,11 @@ private[kafka] class KafkaSystemConsumer(
         null
       }
 
-      put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+      if(fetchLimitByBytesEnabled ) {
+        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)))
+      } else {
+        put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
+      }
 
       setIsAtHead(systemStreamPartition, isAtHead)
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index a60cda2..b574176 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -62,6 +62,7 @@ class KafkaSystemFactory extends SystemFactory with Logging {
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
     val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
+    val fetchThresholdBytes = config.getConsumerFetchThresholdBytes(systemName).getOrElse("-1").toLong
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
     val metadataStore = new ClientUtilTopicMetadataStore(bootstrapServers, clientId, timeout)
 
@@ -77,6 +78,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
       consumerMinSize = consumerMinSize,
       consumerMaxWait = consumerMaxWait,
       fetchThreshold = fetchThreshold,
+      fetchThresholdBytes = fetchThresholdBytes,
+      fetchLimitByBytesEnabled = config.isConsumerFetchThresholdBytesEnabled(systemName),
       offsetGetter = offsetGetter)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 85badf9..c4a83f6 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -108,6 +108,16 @@ class TestKafkaConfig {
     val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME)
     // topic fetch size
     assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
+
+    // default samza.fetch.threshold.bytes
+    val mapConfig3 = new MapConfig(props.toMap[String, String])
+    val kafkaConfig3 = new KafkaConfig(mapConfig3)
+    assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty)
+
+    props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536")
+    val mapConfig4 = new MapConfig(props.toMap[String, String])
+    val kafkaConfig4 = new KafkaConfig(mapConfig4)
+    assertEquals("65536", kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/samza/blob/72a558ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 23fa939..ece0359 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -23,6 +23,10 @@ import kafka.api.TopicMetadata
 import kafka.api.PartitionMetadata
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition
+import kafka.message.Message
+import kafka.message.MessageAndOffset
+
+import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import org.apache.samza.util.TopicMetadataStore
@@ -34,6 +38,9 @@ import org.mockito.Matchers._
 
 class TestKafkaSystemConsumer {
   val systemAdmin: SystemAdmin = mock(classOf[KafkaSystemAdmin])
+  private val SSP: SystemStreamPartition = new SystemStreamPartition("test", "test", new Partition(0))
+  private val envelope: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null)
+  private val envelopeWithSize: IncomingMessageEnvelope = new IncomingMessageEnvelope(SSP, null, null, null, 100)
 
   @Test
   def testFetchThresholdShouldDivideEvenlyAmongPartitions {
@@ -114,6 +121,69 @@ class TestKafkaSystemConsumer {
     assertEquals("2", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp1)))
     assertEquals("0", consumer.topicPartitionsAndOffsets(KafkaSystemConsumer.toTopicAndPartition(ssp2)))
   }
+
+  @Test
+  def testFetchThresholdBytesShouldDivideEvenlyAmongPartitions {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    assertEquals(5000, consumer.perPartitionFetchThreshold)
+    assertEquals(3000, consumer.perPartitionFetchThresholdBytes)
+  }
+
+  @Test
+  def testFetchThresholdBytes {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("test-system", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L, fetchLimitByBytesEnabled = true) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    val msg = Array[Byte](5, 112, 9, 126)
+    val msgAndOffset: MessageAndOffset = MessageAndOffset(new Message(msg), 887654)
+    // 4 data + 14 Message overhead + 80 IncomingMessageEnvelope overhead
+    consumer.sink.addMessage(new TopicAndPartition("test-stream", 0),  msgAndOffset, 887354)
+
+    assertEquals(98, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+  }
+
+
+  @Test
+  def testFetchThresholdBytesDisabled {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", systemAdmin, new KafkaSystemConsumerMetrics, metadataStore,
+      fetchThreshold = 50000, fetchThresholdBytes = 60000L) {
+      override def refreshBrokers {
+      }
+    }
+
+    for (i <- 0 until 10) {
+      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
+    }
+
+    consumer.start
+
+    assertEquals(5000, consumer.perPartitionFetchThreshold)
+    assertEquals(0, consumer.perPartitionFetchThresholdBytes)
+    assertEquals(0, consumer.getMessagesSizeInQueue(new SystemStreamPartition("test-system", "test-stream", new Partition(0))))
+  }
 }
 
 class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {