You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/28 21:49:54 UTC

[3/4] git commit: SAMZA-3 BrokerProxy deadlocks if messages aren't polled from all streams

SAMZA-3 BrokerProxy deadlocks if messages aren't polled from all streams


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/922c3df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/922c3df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/922c3df8

Branch: refs/heads/master
Commit: 922c3df89239005851711b054c1bd2e0e0ffa9f5
Parents: 4ec463b
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Aug 28 10:21:48 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:47:06 2013 -0700

----------------------------------------------------------------------
 .../apache/samza/util/BlockingEnvelopeMap.java  | 35 +++++++++-----------
 .../samza/util/TestBlockingEnvelopeMap.java     |  7 ++--
 .../org/apache/samza/config/KafkaConfig.scala   |  9 ++++-
 .../apache/samza/system/kafka/BrokerProxy.scala |  2 +-
 .../system/kafka/KafkaSystemConsumer.scala      |  9 +++--
 .../samza/system/kafka/KafkaSystemFactory.scala |  4 +--
 .../apache/samza/system/kafka/MessageSink.scala |  2 ++
 .../samza/system/kafka/TestBrokerProxy.scala    | 17 ++++++++--
 .../apache/samza/storage/kv/CachedStore.scala   |  3 +-
 9 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 96f3148..c1dd279 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -22,9 +22,9 @@ package org.apache.samza.util;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.samza.metrics.Counter;
@@ -56,30 +56,28 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   private final BlockingEnvelopeMapMetrics metrics;
   private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
   private final Map<SystemStreamPartition, Boolean> noMoreMessage;
-  private final int queueSize;
   private final Clock clock;
 
   public BlockingEnvelopeMap() {
-    this(1000, new NoOpMetricsRegistry());
+    this(new NoOpMetricsRegistry());
   }
 
   public BlockingEnvelopeMap(Clock clock) {
-    this(1000, new NoOpMetricsRegistry(), clock);
+    this(new NoOpMetricsRegistry(), clock);
   }
 
-  public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry) {
-    this(queueSize, metricsRegistry, new Clock() {
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
+    this(metricsRegistry, new Clock() {
       public long currentTimeMillis() {
         return System.currentTimeMillis();
       }
     });
   }
 
-  public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry, Clock clock) {
-    this.metrics = new BlockingEnvelopeMapMetrics(queueSize, metricsRegistry);
+  public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
+    this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry);
     this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
     this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
-    this.queueSize = queueSize;
     this.clock = clock;
   }
 
@@ -89,7 +87,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
   }
 
   protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
-    return new ArrayBlockingQueue<IncomingMessageEnvelope>(queueSize);
+    return new LinkedBlockingQueue<IncomingMessageEnvelope>();
   }
 
   public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
@@ -152,26 +150,26 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     return messagesToReturn;
   }
 
-  protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
-    bufferedMessages.get(systemStreamPartition).put(envelope);
+  protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+    bufferedMessages.get(systemStreamPartition).add(envelope);
     metrics.incBufferedMessageCount(systemStreamPartition, 1);
   }
 
-  protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
+  protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) {
     BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
 
     for (IncomingMessageEnvelope envelope : envelopes) {
-      queue.put(envelope);
+      queue.add(envelope);
     }
 
     metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size());
   }
 
-  protected int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
+  public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
     BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
 
     if (queue == null) {
-      return 0;
+      throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
     } else {
       return queue.size();
     }
@@ -197,18 +195,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
     private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
     private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
-    // TODO use the queueSize gauge
-    private final Gauge<Integer> queueSize;
     private final Counter pollCount;
 
-    public BlockingEnvelopeMapMetrics(int queueSize, MetricsRegistry metricsRegistry) {
+    public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) {
       this.metricsRegistry = metricsRegistry;
       this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
       this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
       this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
-      this.queueSize = metricsRegistry.<Integer> newGauge(GROUP, "QueueSize", queueSize);
       this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount");
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 2aed464..62152d0 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.*;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import org.junit.Test;
 import org.apache.samza.Partition;
@@ -105,7 +105,7 @@ public class TestBlockingEnvelopeMap {
       @Override
       public void run() {
         try {
-       // Should trigger a take() call.
+          // Should trigger a take() call.
           map.poll(FETCH, -1);
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
@@ -151,13 +151,12 @@ public class TestBlockingEnvelopeMap {
     assertFalse(t.isAlive());
   }
 
-  public class MockQueue extends ArrayBlockingQueue<IncomingMessageEnvelope> {
+  public class MockQueue extends LinkedBlockingQueue<IncomingMessageEnvelope> {
     private static final long serialVersionUID = 1L;
     private final CountDownLatch pollTimeoutBarrier;
     private long timeout;
 
     public MockQueue() {
-      super(1000);
       this.pollTimeoutBarrier = new CountDownLatch(1);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 59d915d..4947b87 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,12 @@ object KafkaConfig {
   val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
   val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
 
+  /**
+   * Defines how low a queue can get for a single system/stream/partition
+   * combination before trying to fetch more messages for it.
+   */
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold"
+
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }
 
@@ -47,6 +53,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   // custom consumer config
   def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
   def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
+  def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 
   /**
    * Returns a map of topic -> auto.offset.reset value for all streams that
@@ -83,7 +90,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
     consumerProps.putAll(injectedProps)
     new ConsumerConfig(consumerProps)
   }
- 
+
   def getKafkaSystemProducerConfig(
     systemName: String,
     clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cb5015d..214de92 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -135,7 +135,7 @@ abstract class BrokerProxy(
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
 
   private def fetchMessages(): Unit = {
-    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.toList: _*)
+    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
     firstCall = false
     firstCallBarrier.countDown()
     if (response.hasError) {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index bd7794a..7970ffc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -56,14 +56,14 @@ private[kafka] class KafkaSystemConsumer(
   brokerListString: String,
   metricsRegistry: MetricsRegistry,
   clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
-  queueSize: Int = 1000,
   timeout: Int = Int.MaxValue,
   bufferSize: Int = 1024000,
   brokerMetadataFailureRefreshMs: Long = 10000,
+  fetchThreshold: Int = 0,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
-  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(queueSize, metricsRegistry, new Clock {
+  clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metricsRegistry, new Clock {
   def currentTimeMillis = clock()
 }) with Toss with Logging {
 
@@ -154,6 +154,10 @@ private[kafka] class KafkaSystemConsumer(
       setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
     }
 
+    def needsMoreMessages(tp: TopicAndPartition) = {
+      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
+    }
+
     def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
       trace("Incoming message %s: %s." format (tp, msg))
 
@@ -171,7 +175,6 @@ private[kafka] class KafkaSystemConsumer(
         null
       }
 
-      // TODO use kafka encoder/decoder here, if they were defined in config
       add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
 
       setIsAtHead(systemStreamPartition, isAtHead)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 13c5baa..fe96dd8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -53,6 +53,7 @@ class KafkaSystemFactory extends SystemFactory {
     val bufferSize = consumerConfig.socketReceiveBufferBytes
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
     val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
       case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
@@ -68,10 +69,9 @@ class KafkaSystemFactory extends SystemFactory {
       brokerListString = brokerListString,
       metricsRegistry = registry,
       clientId = clientId,
-      // TODO make this configurable?
-      queueSize = 1000,
       timeout = timeout,
       bufferSize = bufferSize,
+      fetchThreshold = fetchThreshold,
       offsetGetter = offsetGetter)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
index 71fae59..1ab0346 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -27,4 +27,6 @@ private[kafka] trait MessageSink {
   def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
 
   def abdicate(tp: TopicAndPartition, lastOffset: Long): Unit
+  
+  def needsMoreMessages(tp: TopicAndPartition): Boolean
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 15510ae..947f5a7 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -38,6 +38,8 @@ import org.apache.samza.SamzaException
 import kafka.message.ByteBufferMessageSet
 
 class TestBrokerProxy {
+  val tp2 = new TopicAndPartition("Redbird", 2013)
+
   def getMockBrokerProxy() = {
     val sink = new MessageSink {
       val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
@@ -47,6 +49,9 @@ class TestBrokerProxy {
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
       }
+
+      // Never need messages for tp2.
+      def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
     }
 
     val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
@@ -100,7 +105,7 @@ class TestBrokerProxy {
               por
             }
 
-            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse)
+            val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
             when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
             offsetResponse
           }
@@ -134,7 +139,13 @@ class TestBrokerProxy {
 
           override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
 
-          override def fetch(request: FetchRequest): FetchResponse = sc.fetch(request)
+          override def fetch(request: FetchRequest): FetchResponse = {
+            // Verify that we only get fetch requests for one tp, even though 
+            // two were registered. This is to verify that 
+            // sink.needsMoreMessages works.
+            assertEquals(1, request.requestInfo.size)
+            sc.fetch(request)
+          }
 
           override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
 
@@ -156,6 +167,8 @@ class TestBrokerProxy {
 
     bp.start
     bp.addTopicPartition(tp, "0")
+    // Add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, "0")
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
     assertEquals(42, sink.receivedMessages.get(0)._2.offset)

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 81e33b8..bd479b6 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -93,7 +93,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
     if (found == null || found.dirty == null) {
       this.dirtyCount += 1
     } else {
-      // If we are removing the head of the list, move the head to the next element.
+      // If we are removing the head of the list, move the head to the next 
+      // element. See SAMZA-45 for details.
       if(found.dirty.prev == null) {
         this.dirty = found.dirty.next
         this.dirty.prev = null