You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/01/09 22:52:26 UTC
git commit: SAMZA-126;
don't send empty fetch requests to kafka brokers.
Updated Branches:
refs/heads/master bf6a0eb37 -> 388b992fd
SAMZA-126; don't send empty fetch requests to kafka brokers.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/388b992f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/388b992f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/388b992f
Branch: refs/heads/master
Commit: 388b992fd358f52258afc04ac67e894db24add41
Parents: bf6a0eb
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Jan 9 13:34:58 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Jan 9 13:34:58 2014 -0800
----------------------------------------------------------------------
.../apache/samza/system/kafka/BrokerProxy.scala | 26 ++++++++++++++------
.../kafka/KafkaSystemConsumerMetrics.scala | 2 ++
.../samza/system/kafka/TestBrokerProxy.scala | 12 +++++++++
3 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 124700e..5e3b7cb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -131,17 +131,27 @@ abstract class BrokerProxy(
}, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
private def fetchMessages(): Unit = {
- metrics.brokerReads(host, port).inc
- val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
- firstCall = false
- firstCallBarrier.countDown()
+ val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
- // Split response into errors and non errors, processing the errors first
- val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
+ if (topicAndPartitionsToFetch.size > 0) {
+ metrics.brokerReads(host, port).inc
+ val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
+ firstCall = false
+ firstCallBarrier.countDown()
- handleErrors(errorResponses, response)
+ // Split response into errors and non errors, processing the errors first
+ val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
- nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+ handleErrors(errorResponses, response)
+
+ nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+ } else {
+ debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
+
+ metrics.brokerSkippedReads(host, port).inc
+
+ Thread.sleep(sleepMSWhileNoTopicPartitions)
+ }
}
def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 1012d58..143be68 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -40,6 +40,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
val reconnects = new ConcurrentHashMap[(String, Int), Counter]
val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
+ val brokerSkippedReads = new ConcurrentHashMap[(String, Int), Counter]
val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
def registerTopicAndPartition(tp: TopicAndPartition) = {
@@ -55,6 +56,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port)))
+ brokerSkippedReads.put((host, port), newCounter("%s-%s-skipped-reads" format (host, port)))
topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index e25cc4f..36445df 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -169,6 +169,18 @@ class TestBrokerProxy extends Logging {
assertEquals(84, sink.receivedMessages.get(1)._2.offset)
}
+ @Test def brokerProxySkipsFetchForEmptyRequests() = {
+ val (bp, tp, sink) = getMockBrokerProxy()
+
+ bp.start
+ // Only add tp2, which should never receive messages since sink disables it.
+ bp.addTopicPartition(tp2, Option("0"))
+ Thread.sleep(1000)
+ assertEquals(0, sink.receivedMessages.size)
+ assertTrue(bp.metrics.brokerSkippedReads(bp.host, bp.port).getCount > 0)
+ assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0)
+ }
+
@Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
val (bp, tp, _) = getMockBrokerProxy()
bp.start