You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/28 21:49:52 UTC

[1/4] git commit: tweak kv test to trigger NPE when test is run with Scala 2.8.1. Validate that patch fixes the bug. Add brackets in if else in cached store for style.

Updated Branches:
  refs/heads/master 71f030ee4 -> c942c28d9


tweak kv test to trigger NPE when test is run with Scala 2.8.1. Validate that patch fixes the bug. Add brackets in if else in cached store for style.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/4ec463b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/4ec463b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/4ec463b3

Branch: refs/heads/master
Commit: 4ec463b35e71f470e5b296f1ec65bb5866f495c4
Parents: 39cf91e
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Aug 28 10:14:03 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:47:05 2013 -0700

----------------------------------------------------------------------
 .../src/main/scala/org/apache/samza/storage/kv/CachedStore.scala | 4 ++--
 .../scala/org/apache/samza/storage/kv/TestKeyValueStores.scala   | 3 ++-
 2 files changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4ec463b3/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 8938b89..81e33b8 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -90,9 +90,9 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
   def put(key: K, value: V) {
     // add the key to the front of the dirty list (and remove any prior occurrences to dedupe)
     val found = cache.get(key)
-    if (found == null || found.dirty == null)
+    if (found == null || found.dirty == null) {
       this.dirtyCount += 1
-    else {
+    } else {
       // If we are removing the head of the list, move the head to the next element.
       if(found.dirty.prev == null) {
         this.dirty = found.dirty.next

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/4ec463b3/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index 5f9d66c..0be0722 100644
--- a/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ b/samza-kv/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -79,7 +79,8 @@ class TestKeyValueStores(cache: Boolean) {
     val k = b("k2")
     store.put(k, b("v1"))
     store.put(k, b("v2"))
-    assertTrue(Arrays.equals(b("v2"), store.get(k)))
+    store.put(k, b("v3"))
+    assertTrue(Arrays.equals(b("v3"), store.get(k)))
   }
 
   @Test


[4/4] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-samza

Posted by cr...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-samza


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/c942c28d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/c942c28d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/c942c28d

Branch: refs/heads/master
Commit: c942c28d958ab1e14db2c93367dd9ce5173c3b0f
Parents: 922c3df 71f030e
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Aug 28 12:48:51 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:48:51 2013 -0700

----------------------------------------------------------------------

----------------------------------------------------------------------



[2/4] git commit: don't use remove() on head in cachedstore's double linked list because of https://issues.scala-lang.org/browse/SI-3970

Posted by cr...@apache.org.
don't use remove() on head in cachedstore's double linked list because of https://issues.scala-lang.org/browse/SI-3970


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/39cf91ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/39cf91ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/39cf91ef

Branch: refs/heads/master
Commit: 39cf91efc44d756df5db294f49ae9cf4a8c93b52
Parents: 251b42d
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 27 15:47:17 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:47:05 2013 -0700

----------------------------------------------------------------------
 .../scala/org/apache/samza/storage/kv/CachedStore.scala  | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/39cf91ef/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 0eed8fa..8938b89 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -92,8 +92,15 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     val found = cache.get(key)
     if (found == null || found.dirty == null)
       this.dirtyCount += 1
-    else
-      found.dirty.remove()
+    else {
+      // If we are removing the head of the list, move the head to the next element.
+      if(found.dirty.prev == null) {
+        this.dirty = found.dirty.next
+        this.dirty.prev = null
+      } else {
+        found.dirty.remove
+      }
+    }
     this.dirty = new mutable.DoubleLinkedList(key, this.dirty)
 
     // add the key to the cache (but don't allocate a new cache entry if we already have one)


[3/4] git commit: SAMZA-3 BrokerProxy deadlocks if messages aren't polled from all streams

Posted by cr...@apache.org.
SAMZA-3 BrokerProxy deadlocks if messages aren't polled from all streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/922c3df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/922c3df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/922c3df8

Branch: refs/heads/master
Commit: 922c3df89239005851711b054c1bd2e0e0ffa9f5
Parents: 4ec463b
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Aug 28 10:21:48 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:47:06 2013 -0700

----------------------------------------------------------------------
 .../apache/samza/util/BlockingEnvelopeMap.java  | 35 +++++++++-----------
 .../samza/util/TestBlockingEnvelopeMap.java     |  7 ++--
 .../org/apache/samza/config/KafkaConfig.scala   |  9 ++++-
 .../apache/samza/system/kafka/BrokerProxy.scala |  2 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  9 +++--
 .../samza/system/kafka/KafkaSystemFactory.scala |  4 +--
 .../apache/samza/system/kafka/MessageSink.scala |  2 ++
 .../samza/system/kafka/TestBrokerProxy.scala    | 17 ++++++++--
 .../apache/samza/storage/kv/CachedStore.scala   |  3 +-
 9 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 96f3148..c1dd279 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -22,9 +22,9 @@ package org.apache.samza.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.metrics.Counter;
@@ -56,30 +56,28 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final BlockingEnvelopeMapMetrics metrics;
   private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
-  private final int queueSize;
   private final Clock clock;
 
   public BlockingEnvelopeMap() {
-    this(1000, new NoOpMetricsRegistry());
+    this(new NoOpMetricsRegistry());
   }
 
   public BlockingEnvelopeMap(Clock clock) {
-    this(1000, new NoOpMetricsRegistry(), clock);
+    this(new NoOpMetricsRegistry(), clock);
   }
 
-  public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry) {
-    this(queueSize, metricsRegistry, new Clock() {
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
+    this(metricsRegistry, new Clock() {
       public long currentTimeMillis() {
         return System.currentTimeMillis();
       }
     });
   }
 
-  public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry, Clock clock) {
-    this.metrics = new BlockingEnvelopeMapMetrics(queueSize, metricsRegistry);
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
+    this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
-    this.queueSize = queueSize;
     this.clock = clock;
   }
 
@@ -89,7 +87,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
-    return new ArrayBlockingQueue<IncomingMessageEnvelope>(queueSize);
+    return new LinkedBlockingQueue<IncomingMessageEnvelope>();
   }
 
   public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
@@ -152,26 +150,26 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return messagesToReturn;
   }
 
-  protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
-    bufferedMessages.get(systemStreamPartition).put(envelope);
+  protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+    bufferedMessages.get(systemStreamPartition).add(envelope);
     metrics.incBufferedMessageCount(systemStreamPartition, 1);
   }
 
-  protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
+  protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) {
     BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
 
     for (IncomingMessageEnvelope envelope : envelopes) {
-      queue.put(envelope);
+      queue.add(envelope);
     }
 
     metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size());
   }
 
-  protected int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
+  public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
     BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
 
     if (queue == null) {
-      return 0;
+      throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
     } else {
       return queue.size();
     }
@@ -197,18 +195,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
-    // TODO use the queueSize gauge
-    private final Gauge<Integer> queueSize;
     private final Counter pollCount;
 
-    public BlockingEnvelopeMapMetrics(int queueSize, MetricsRegistry metricsRegistry) {
+    public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) {
       this.metricsRegistry = metricsRegistry;
       this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
       this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
-      this.queueSize = metricsRegistry.<Integer> newGauge(GROUP, "QueueSize", queueSize);
       this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 2aed464..62152d0 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.apache.samza.Partition;
@@ -105,7 +105,7 @@ public class TestBlockingEnvelopeMap {
       @Override
       public void run() {
         try {
-       // Should trigger a take() call.
+          // Should trigger a take() call.
           map.poll(FETCH, -1);
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
@@ -151,13 +151,12 @@ public class TestBlockingEnvelopeMap {
     assertFalse(t.isAlive());
   }
 
-  public class MockQueue extends ArrayBlockingQueue<IncomingMessageEnvelope> {
+  public class MockQueue extends LinkedBlockingQueue<IncomingMessageEnvelope> {
     private static final long serialVersionUID = 1L;
     private final CountDownLatch pollTimeoutBarrier;
     private long timeout;
 
     public MockQueue() {
-      super(1000);
       this.pollTimeoutBarrier = new CountDownLatch(1);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 59d915d..4947b87 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,12 @@ object KafkaConfig {
   val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
   val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
 
+  /**
+   * Defines how low a queue can get for a single system/stream/partition
+   * combination before trying to fetch more messages for it.
+   */
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold"
+
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }
 
@@ -47,6 +53,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // custom consumer config
   def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
   def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
+  def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 
   /**
    * Returns a map of topic -> auto.offset.reset value for all streams that
@@ -83,7 +90,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     consumerProps.putAll(injectedProps)
     new ConsumerConfig(consumerProps)
   }
- 
+
   def getKafkaSystemProducerConfig(
     systemName: String,
     clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cb5015d..214de92 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -135,7 +135,7 @@ abstract class BrokerProxy(
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
 
   private def fetchMessages(): Unit = {
-    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.toList: _*)
+    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
     firstCall = false
     firstCallBarrier.countDown()
     if (response.hasError) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index bd7794a..7970ffc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -56,14 +56,14 @@ private[kafka] class KafkaSystemConsumer(
   brokerListString: String,
   metricsRegistry: MetricsRegistry,
   clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
-  queueSize: Int = 1000,
   timeout: Int = Int.MaxValue,
   bufferSize: Int = 1024000,
   brokerMetadataFailureRefreshMs: Long = 10000,
+  fetchThreshold: Int = 0,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(queueSize, metricsRegistry, new Clock {
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metricsRegistry, new Clock {
   def currentTimeMillis = clock()
 }) with Toss with Logging {
 
@@ -154,6 +154,10 @@ private[kafka] class KafkaSystemConsumer(
       setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
     }
 
+    def needsMoreMessages(tp: TopicAndPartition) = {
+      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
+    }
+
     def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
       trace("Incoming message %s: %s." format (tp, msg))
 
@@ -171,7 +175,6 @@ private[kafka] class KafkaSystemConsumer(
         null
       }
 
-      // TODO use kafka encoder/decoder here, if they were defined in config
       add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
 
       setIsAtHead(systemStreamPartition, isAtHead)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 13c5baa..fe96dd8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -53,6 +53,7 @@ class KafkaSystemFactory extends SystemFactory {
     val bufferSize = consumerConfig.socketReceiveBufferBytes
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
     val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
       case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
@@ -68,10 +69,9 @@ class KafkaSystemFactory extends SystemFactory {
       brokerListString = brokerListString,
       metricsRegistry = registry,
       clientId = clientId,
-      // TODO make this configurable?
-      queueSize = 1000,
       timeout = timeout,
       bufferSize = bufferSize,
+      fetchThreshold = fetchThreshold,
       offsetGetter = offsetGetter)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
index 71fae59..1ab0346 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -27,4 +27,6 @@ private[kafka] trait MessageSink {
   def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
 
   def abdicate(tp: TopicAndPartition, lastOffset: Long): Unit
+  
+  def needsMoreMessages(tp: TopicAndPartition): Boolean
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 15510ae..947f5a7 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -38,6 +38,8 @@ import org.apache.samza.SamzaException
 import kafka.message.ByteBufferMessageSet
 
 class TestBrokerProxy {
+  val tp2 = new TopicAndPartition("Redbird", 2013)
+
   def getMockBrokerProxy() = {
     val sink = new MessageSink {
       val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
@@ -47,6 +49,9 @@ class TestBrokerProxy {
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
       }
+
+      // Never need messages for tp2.
+      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
     }
 
     val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
@@ -100,7 +105,7 @@ class TestBrokerProxy {
               por
             }
 
-            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse)
+            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
             when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
             offsetResponse
           }
@@ -134,7 +139,13 @@ class TestBrokerProxy {
 
           override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
 
-          override def fetch(request: FetchRequest): FetchResponse = sc.fetch(request)
+          override def fetch(request: FetchRequest): FetchResponse = {
+            // Verify that we only get fetch requests for one tp, even though 
+            // two were registered. This is to verify that 
+            // sink.needsMoreMessages works.
+            assertEquals(1, request.requestInfo.size)
+            sc.fetch(request)
+          }
 
           override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
 
@@ -156,6 +167,8 @@ class TestBrokerProxy {
 
     bp.start
     bp.addTopicPartition(tp, "0")
+    // Add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, "0")
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
     assertEquals(42, sink.receivedMessages.get(0)._2.offset)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 81e33b8..bd479b6 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -93,7 +93,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     if (found == null || found.dirty == null) {
       this.dirtyCount += 1
     } else {
-      // If we are removing the head of the list, move the head to the next element.
+      // If we are removing the head of the list, move the head to the next 
+      // element. See SAMZA-45 for details.
       if(found.dirty.prev == null) {
         this.dirty = found.dirty.next
         this.dirty.prev = null