You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/01/09 22:52:26 UTC

git commit: SAMZA-126; don't send empty fetch requests to kafka brokers.

Updated Branches:
  refs/heads/master bf6a0eb37 -> 388b992fd


SAMZA-126; don't send empty fetch requests to kafka brokers.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/388b992f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/388b992f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/388b992f

Branch: refs/heads/master
Commit: 388b992fd358f52258afc04ac67e894db24add41
Parents: bf6a0eb
Author: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Authored: Thu Jan 9 13:34:58 2014 -0800
Committer: Chris Riccomini <cr...@criccomi-ld.linkedin.biz>
Committed: Thu Jan 9 13:34:58 2014 -0800

----------------------------------------------------------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 26 ++++++++++++++------
 .../kafka/KafkaSystemConsumerMetrics.scala      |  2 ++
 .../samza/system/kafka/TestBrokerProxy.scala    | 12 +++++++++
 3 files changed, 32 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 124700e..5e3b7cb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -131,17 +131,27 @@ abstract class BrokerProxy(
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
 
   private def fetchMessages(): Unit = {
-    metrics.brokerReads(host, port).inc
-    val response: FetchResponse = simpleConsumer.defaultFetch(nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList: _*)
-    firstCall = false
-    firstCallBarrier.countDown()
+    val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
 
-    // Split response into errors and non errors, processing the errors first
-    val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
+    if (topicAndPartitionsToFetch.size > 0) {
+      metrics.brokerReads(host, port).inc
+      val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
+      firstCall = false
+      firstCallBarrier.countDown()
 
-    handleErrors(errorResponses, response)
+      // Split response into errors and non errors, processing the errors first
+      val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
 
-    nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+      handleErrors(errorResponses, response)
+
+      nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+    } else {
+      debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
+
+      metrics.brokerSkippedReads(host, port).inc
+
+      Thread.sleep(sleepMSWhileNoTopicPartitions)
+    }
   }
 
   def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
index 1012d58..143be68 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumerMetrics.scala
@@ -40,6 +40,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
   val reconnects = new ConcurrentHashMap[(String, Int), Counter]
   val brokerBytesRead = new ConcurrentHashMap[(String, Int), Counter]
   val brokerReads = new ConcurrentHashMap[(String, Int), Counter]
+  val brokerSkippedReads = new ConcurrentHashMap[(String, Int), Counter]
   val topicPartitions = new ConcurrentHashMap[(String, Int), Gauge[Int]]
 
   def registerTopicAndPartition(tp: TopicAndPartition) = {
@@ -55,6 +56,7 @@ class KafkaSystemConsumerMetrics(val systemName: String = "unknown", val registr
     reconnects.put((host, port), newCounter("%s-%s-reconnects" format (host, port)))
     brokerBytesRead.put((host, port), newCounter("%s-%s-bytes-read" format (host, port)))
     brokerReads.put((host, port), newCounter("%s-%s-reads" format (host, port)))
+    brokerSkippedReads.put((host, port), newCounter("%s-%s-skipped-reads" format (host, port)))
     topicPartitions.put((host, port), newGauge("%s-%s-topic-partitions" format (host, port), 0))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/388b992f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index e25cc4f..36445df 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -169,6 +169,18 @@ class TestBrokerProxy extends Logging {
     assertEquals(84, sink.receivedMessages.get(1)._2.offset)
   }
 
+  @Test def brokerProxySkipsFetchForEmptyRequests() = {
+    val (bp, tp, sink) = getMockBrokerProxy()
+
+    bp.start
+    // Only add tp2, which should never receive messages since sink disables it.
+    bp.addTopicPartition(tp2, Option("0"))
+    Thread.sleep(1000)
+    assertEquals(0, sink.receivedMessages.size)
+    assertTrue(bp.metrics.brokerSkippedReads(bp.host, bp.port).getCount > 0)
+    assertTrue(bp.metrics.brokerReads(bp.host, bp.port).getCount == 0)
+  }
+
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
     val (bp, tp, _) = getMockBrokerProxy()
     bp.start