You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2013/08/28 21:49:54 UTC
[3/4] git commit: SAMZA-3 BrokerProxy deadlocks if messages aren't
polled from all streams
SAMZA-3 BrokerProxy deadlocks if messages aren't polled from all streams
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/922c3df8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/922c3df8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/922c3df8
Branch: refs/heads/master
Commit: 922c3df89239005851711b054c1bd2e0e0ffa9f5
Parents: 4ec463b
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Wed Aug 28 10:21:48 2013 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Wed Aug 28 12:47:06 2013 -0700
----------------------------------------------------------------------
.../apache/samza/util/BlockingEnvelopeMap.java | 35 +++++++++-----------
.../samza/util/TestBlockingEnvelopeMap.java | 7 ++--
.../org/apache/samza/config/KafkaConfig.scala | 9 ++++-
.../apache/samza/system/kafka/BrokerProxy.scala | 2 +-
.../system/kafka/KafkaSystemConsumer.scala | 9 +++--
.../samza/system/kafka/KafkaSystemFactory.scala | 4 +--
.../apache/samza/system/kafka/MessageSink.scala | 2 ++
.../samza/system/kafka/TestBrokerProxy.scala | 17 ++++++++--
.../apache/samza/storage/kv/CachedStore.scala | 3 +-
9 files changed, 54 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
index 96f3148..c1dd279 100644
--- a/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
+++ b/samza-api/src/main/java/org/apache/samza/util/BlockingEnvelopeMap.java
@@ -22,9 +22,9 @@ package org.apache.samza.util;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.Counter;
@@ -56,30 +56,28 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final BlockingEnvelopeMapMetrics metrics;
private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
private final Map<SystemStreamPartition, Boolean> noMoreMessage;
- private final int queueSize;
private final Clock clock;
public BlockingEnvelopeMap() {
- this(1000, new NoOpMetricsRegistry());
+ this(new NoOpMetricsRegistry());
}
public BlockingEnvelopeMap(Clock clock) {
- this(1000, new NoOpMetricsRegistry(), clock);
+ this(new NoOpMetricsRegistry(), clock);
}
- public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry) {
- this(queueSize, metricsRegistry, new Clock() {
+ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
+ this(metricsRegistry, new Clock() {
public long currentTimeMillis() {
return System.currentTimeMillis();
}
});
}
- public BlockingEnvelopeMap(int queueSize, MetricsRegistry metricsRegistry, Clock clock) {
- this.metrics = new BlockingEnvelopeMapMetrics(queueSize, metricsRegistry);
+ public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
+ this.metrics = new BlockingEnvelopeMapMetrics(metricsRegistry);
this.bufferedMessages = new ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>>();
this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
- this.queueSize = queueSize;
this.clock = clock;
}
@@ -89,7 +87,7 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
}
protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
- return new ArrayBlockingQueue<IncomingMessageEnvelope>(queueSize);
+ return new LinkedBlockingQueue<IncomingMessageEnvelope>();
}
public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
@@ -152,26 +150,26 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
return messagesToReturn;
}
- protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
- bufferedMessages.get(systemStreamPartition).put(envelope);
+ protected void add(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) {
+ bufferedMessages.get(systemStreamPartition).add(envelope);
metrics.incBufferedMessageCount(systemStreamPartition, 1);
}
- protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
+ protected void addAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
for (IncomingMessageEnvelope envelope : envelopes) {
- queue.put(envelope);
+ queue.add(envelope);
}
metrics.incBufferedMessageCount(systemStreamPartition, envelopes.size());
}
- protected int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
+ public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
BlockingQueue<IncomingMessageEnvelope> queue = bufferedMessages.get(systemStreamPartition);
if (queue == null) {
- return 0;
+ throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
} else {
return queue.size();
}
@@ -197,18 +195,15 @@ public abstract class BlockingEnvelopeMap implements SystemConsumer {
private final ConcurrentHashMap<SystemStreamPartition, Counter> pollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
- // TODO use the queueSize gauge
- private final Gauge<Integer> queueSize;
private final Counter pollCount;
- public BlockingEnvelopeMapMetrics(int queueSize, MetricsRegistry metricsRegistry) {
+ public BlockingEnvelopeMapMetrics(MetricsRegistry metricsRegistry) {
this.metricsRegistry = metricsRegistry;
this.bufferedMessageCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.noMoreMessageGaugeMap = new ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>>();
this.pollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.blockingPollCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
this.blockingPollTimeoutCountMap = new ConcurrentHashMap<SystemStreamPartition, Counter>();
- this.queueSize = metricsRegistry.<Integer> newGauge(GROUP, "QueueSize", queueSize);
this.pollCount = metricsRegistry.newCounter(GROUP, "PollCount");
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
----------------------------------------------------------------------
diff --git a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
index 2aed464..62152d0 100644
--- a/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
+++ b/samza-api/src/test/java/org/apache/samza/util/TestBlockingEnvelopeMap.java
@@ -23,9 +23,9 @@ import static org.junit.Assert.*;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import org.apache.samza.Partition;
@@ -105,7 +105,7 @@ public class TestBlockingEnvelopeMap {
@Override
public void run() {
try {
- // Should trigger a take() call.
+ // Should trigger a take() call.
map.poll(FETCH, -1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
@@ -151,13 +151,12 @@ public class TestBlockingEnvelopeMap {
assertFalse(t.isAlive());
}
- public class MockQueue extends ArrayBlockingQueue<IncomingMessageEnvelope> {
+ public class MockQueue extends LinkedBlockingQueue<IncomingMessageEnvelope> {
private static final long serialVersionUID = 1L;
private final CountDownLatch pollTimeoutBarrier;
private long timeout;
public MockQueue() {
- super(1000);
this.pollTimeoutBarrier = new CountDownLatch(1);
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 59d915d..4947b87 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -36,6 +36,12 @@ object KafkaConfig {
val CONSUMER_KEY_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.key.deserializer.class"
val CONSUMER_MSG_DESERIALIZER = SystemConfig.SYSTEM_PREFIX + "consumer.deserializer.class"
+ /**
+ * Defines how low a queue can get for a single system/stream/partition
+ * combination before trying to fetch more messages for it.
+ */
+ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold"
+
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
@@ -47,6 +53,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
// custom consumer config
def getConsumerKeyDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_KEY_DESERIALIZER format name)
def getConsumerMsgDeserializerClass(name: String) = getOption(KafkaConfig.CONSUMER_MSG_DESERIALIZER format name)
+ def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
/**
* Returns a map of topic -> auto.offset.reset value for all streams that
@@ -83,7 +90,7 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
consumerProps.putAll(injectedProps)
new ConsumerConfig(consumerProps)
}
-
+
def getKafkaSystemProducerConfig(
systemName: String,
clientId: String = "undefined-samza-producer-" format UUID.randomUUID.toString,
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index cb5015d..214de92 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -135,7 +135,7 @@ abstract class BrokerProxy(
}, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
private def fetchMessages(): Unit = {
- val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.toList: _*)
+ val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
firstCall = false
firstCallBarrier.countDown()
if (response.hasError) {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index bd7794a..7970ffc 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -56,14 +56,14 @@ private[kafka] class KafkaSystemConsumer(
brokerListString: String,
metricsRegistry: MetricsRegistry,
clientId: String = "undefined-client-id-" + UUID.randomUUID.toString,
- queueSize: Int = 1000,
timeout: Int = Int.MaxValue,
bufferSize: Int = 1024000,
brokerMetadataFailureRefreshMs: Long = 10000,
+ fetchThreshold: Int = 0,
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
- clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(queueSize, metricsRegistry, new Clock {
+ clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metricsRegistry, new Clock {
def currentTimeMillis = clock()
}) with Toss with Logging {
@@ -154,6 +154,10 @@ private[kafka] class KafkaSystemConsumer(
setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
}
+ def needsMoreMessages(tp: TopicAndPartition) = {
+ getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
+ }
+
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
trace("Incoming message %s: %s." format (tp, msg))
@@ -171,7 +175,6 @@ private[kafka] class KafkaSystemConsumer(
null
}
- // TODO use kafka encoder/decoder here, if they were defined in config
add(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
setIsAtHead(systemStreamPartition, isAtHead)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 13c5baa..fe96dd8 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -53,6 +53,7 @@ class KafkaSystemFactory extends SystemFactory {
val bufferSize = consumerConfig.socketReceiveBufferBytes
val autoOffsetResetDefault = consumerConfig.autoOffsetReset
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
+ val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
val deserializer = config.getConsumerMsgDeserializerClass(systemName) match {
case Some(deserializerClass) => Util.getObj[Decoder[Object]](deserializerClass)
@@ -68,10 +69,9 @@ class KafkaSystemFactory extends SystemFactory {
brokerListString = brokerListString,
metricsRegistry = registry,
clientId = clientId,
- // TODO make this configurable?
- queueSize = 1000,
timeout = timeout,
bufferSize = bufferSize,
+ fetchThreshold = fetchThreshold,
offsetGetter = offsetGetter)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
index 71fae59..1ab0346 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/MessageSink.scala
@@ -27,4 +27,6 @@ private[kafka] trait MessageSink {
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long): Unit
def abdicate(tp: TopicAndPartition, lastOffset: Long): Unit
+
+ def needsMoreMessages(tp: TopicAndPartition): Boolean
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 15510ae..947f5a7 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -38,6 +38,8 @@ import org.apache.samza.SamzaException
import kafka.message.ByteBufferMessageSet
class TestBrokerProxy {
+ val tp2 = new TopicAndPartition("Redbird", 2013)
+
def getMockBrokerProxy() = {
val sink = new MessageSink {
val receivedMessages = new scala.collection.mutable.ListBuffer[(TopicAndPartition, MessageAndOffset, Boolean)]()
@@ -47,6 +49,9 @@ class TestBrokerProxy {
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
}
+
+ // Never need messages for tp2.
+ def needsMoreMessages(tp: TopicAndPartition): Boolean = !tp.equals(tp2)
}
val config = new MapConfig(Map[String, String]("job.name" -> "Jobby McJob",
@@ -100,7 +105,7 @@ class TestBrokerProxy {
por
}
- val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse)
+ val map = scala.Predef.Map[TopicAndPartition, PartitionOffsetsResponse](tp -> partitionOffsetResponse, tp2 -> partitionOffsetResponse)
when(offsetResponse.partitionErrorAndOffsets).thenReturn(map)
offsetResponse
}
@@ -134,7 +139,13 @@ class TestBrokerProxy {
override def send(request: TopicMetadataRequest): TopicMetadataResponse = sc.send(request)
- override def fetch(request: FetchRequest): FetchResponse = sc.fetch(request)
+ override def fetch(request: FetchRequest): FetchResponse = {
+ // Verify that we only get fetch requests for one tp, even though
+ // two were registered. This is to verify that
+ // sink.needsMoreMessages works.
+ assertEquals(1, request.requestInfo.size)
+ sc.fetch(request)
+ }
override def getOffsetsBefore(request: OffsetRequest): OffsetResponse = sc.getOffsetsBefore(request)
@@ -156,6 +167,8 @@ class TestBrokerProxy {
bp.start
bp.addTopicPartition(tp, "0")
+ // Add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, "0")
Thread.sleep(1000)
assertEquals(2, sink.receivedMessages.size)
assertEquals(42, sink.receivedMessages.get(0)._2.offset)
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/922c3df8/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
----------------------------------------------------------------------
diff --git a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
index 81e33b8..bd479b6 100644
--- a/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
+++ b/samza-kv/src/main/scala/org/apache/samza/storage/kv/CachedStore.scala
@@ -93,7 +93,8 @@ class CachedStore[K, V](val store: KeyValueStore[K, V],
if (found == null || found.dirty == null) {
this.dirtyCount += 1
} else {
- // If we are removing the head of the list, move the head to the next element.
+ // If we are removing the head of the list, move the head to the next
+ // element. See SAMZA-45 for details.
if(found.dirty.prev == null) {
this.dirty = found.dirty.next
this.dirty.prev = null