You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/25 17:24:53 UTC

git commit: Test fails with this patch. Reverting to fix.

Repository: incubator-samza
Updated Branches:
  refs/heads/master 87f19fcd0 -> a62107786


Test fails with this patch. Reverting to fix.

Revert "SAMZA-203; fix changelog restore performance by increasing flushThreshold in kafka system consumer"

This reverts commit 87f19fcd0ba38124fd231ac314d056180643931b.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a6210778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a6210778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a6210778

Branch: refs/heads/master
Commit: a6210778617c1cf5e23fbc2338cc01a5f82e465f
Parents: 87f19fc
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Mar 25 09:24:27 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Mar 25 09:24:27 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   |  2 +-
 .../apache/samza/system/kafka/BrokerProxy.scala | 70 +++++++++-----------
 .../system/kafka/KafkaSystemConsumer.scala      | 43 +++---------
 .../samza/system/kafka/KafkaSystemFactory.scala |  4 +-
 .../system/kafka/TestKafkaSystemConsumer.scala  | 19 ------
 5 files changed, 46 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4deabd3..978620a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,7 +40,7 @@ object KafkaConfig {
    * Defines how low a queue can get for a single system/stream/partition
    * combination before trying to fetch more messages for it.
    */
-  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+  val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold"
 
   implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 88817ef..bca2f86 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -38,7 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
  *  Companion object for class JvmMetrics encapsulating various constants
  */
 object BrokerProxy {
-  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
+  val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY"
 }
 
 /**
@@ -62,7 +62,7 @@ class BrokerProxy(
   /**
    * How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
    */
-  val sleepMSWhileNoTopicPartitions = 100
+  val sleepMSWhileNoTopicPartitions = 1000
 
   /** What's the next offset for a particular partition? **/
   val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
@@ -125,42 +125,36 @@ class BrokerProxy(
   val thread = new Thread(new Runnable {
     def run {
       var reconnect = false
-
-      try {
-        (new ExponentialSleepStrategy).run(
-          loop => {
-            if (reconnect) {
-              metrics.reconnects(host, port).inc
-              simpleConsumer.close()
-              simpleConsumer = createSimpleConsumer()
-            }
-
-            while (!Thread.currentThread.isInterrupted) {
-              if (nextOffsets.size == 0) {
-                debug("No TopicPartitions to fetch. Sleeping.")
-                Thread.sleep(sleepMSWhileNoTopicPartitions)
-              } else {
-                fetchMessages
-
-                // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
-                // In that case, reset the loop delay, so that the next time an error occurs,
-                // we start with a short retry delay.
-                loop.reset
-              }
+      (new ExponentialSleepStrategy).run(
+        loop => {
+          if (reconnect) {
+            metrics.reconnects(host, port).inc
+            simpleConsumer.close()
+            simpleConsumer = createSimpleConsumer()
+          }
+
+          while (!Thread.currentThread.isInterrupted) {
+            if (nextOffsets.size == 0) {
+              debug("No TopicPartitions to fetch. Sleeping.")
+              Thread.sleep(sleepMSWhileNoTopicPartitions)
+            } else {
+              fetchMessages
+
+              // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
+              // In that case, reset the loop delay, so that the next time an error occurs,
+              // we start with a short retry delay.
+              loop.reset
             }
-          },
-
-          (exception, loop) => {
-            warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
-            debug(exception)
-            reconnect = true
-          })
-      } catch {
-        case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
-        case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
-      }
-
-      if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
+          }
+        },
+
+        (exception, loop) => {
+          warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
+          debug(exception)
+          reconnect = true
+        }
+      )
+      if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt")
     }
   }, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
 
@@ -269,7 +263,7 @@ class BrokerProxy(
     info("Starting " + toString)
 
     thread.setDaemon(true)
-    thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+    thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX)
     thread.start
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index e1ea2ff..8ad97df 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -34,7 +34,6 @@ import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.SamzaException
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -55,24 +54,10 @@ private[kafka] class KafkaSystemConsumer(
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
   timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  fetchSize: Int = ConsumerConfig.MaxFetchSize,
-  consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
-  consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-
-  /**
-   * Defines a low water mark for how many messages we buffer before we start
-   * executing fetch requests against brokers to get more messages. This value
-   * is divided equally among all registered SystemStreamPartitions. For
-   * example, if fetchThreshold is set to 50000, and there are 50
-   * SystemStreamPartitions registered, then the per-partition threshold is
-   * 1000. As soon as a SystemStreamPartition's buffered message count drops
-   * below 1000, a fetch request will be executed to get more data for it.
-   *
-   * Increasing this parameter will decrease the latency between when a queue
-   * is drained of messages and when new messages are enqueued, but also leads
-   * to an increase in memory usage since more messages will be held in memory.
-   */
-  fetchThreshold: Int = 50000,
+  fetchSize:Int = ConsumerConfig.MaxFetchSize,
+  consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
+  consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
+  fetchThreshold: Int = 0,
   offsetGetter: GetOffset = new GetOffset("fail"),
   deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
   keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
@@ -84,15 +69,8 @@ private[kafka] class KafkaSystemConsumer(
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
   var nextOffsets = Map[SystemStreamPartition, String]()
-  var perPartitionFetchThreshold = fetchThreshold
 
   def start() {
-    if (nextOffsets.size <= 0) {
-      throw new SamzaException("No SystemStreamPartitions registered. Must register at least one SystemStreamPartition before starting the consumer.")
-    }
-
-    perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
-
     val topicPartitionsAndOffsets = nextOffsets.map {
       case (systemStreamPartition, offset) =>
         val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
@@ -128,16 +106,16 @@ private[kafka] class KafkaSystemConsumer(
 
         // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
         // This avoids trying to re-add the same topic partition repeatedly
-        def refresh(tp: List[TopicAndPartition]) = {
+        def refresh(tp:List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
           val nextOffset = topicPartitionsAndOffsets.get(head).get
           // Whatever we do, we can't say Broker, even though we're
           // manipulating it here. Broker is a private type and Scala doesn't seem
           // to care about that as long as you don't explicitly declare its type.
           val brokerOption = partitionMetadata(head.topic)
-            .partitionsMetadata
-            .find(_.partitionId == head.partition)
-            .flatMap(_.leader)
+                             .partitionsMetadata
+                             .find(_.partitionId == head.partition)
+                             .flatMap(_.leader)
 
           brokerOption match {
             case Some(broker) =>
@@ -160,7 +138,8 @@ private[kafka] class KafkaSystemConsumer(
       (loop, exception) => {
         warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
         debug(exception)
-      })
+      }
+    )
   }
 
   val sink = new MessageSink {
@@ -169,7 +148,7 @@ private[kafka] class KafkaSystemConsumer(
     }
 
     def needsMoreMessages(tp: TopicAndPartition) = {
-      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold
+      getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
     }
 
     def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d6e3a52..feecc58 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -47,9 +47,9 @@ class KafkaSystemFactory extends SystemFactory {
     val consumerMaxWait = consumerConfig.fetchWaitMaxMs
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
-    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
+    val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
-
+   
     new KafkaSystemConsumer(
       systemName = systemName,
       brokerListString = brokerListString,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
deleted file mode 100644
index 8bd51a1..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.samza.system.kafka
-
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-
-class TestKafkaSystemConsumer {
-  @Test
-  def testFetchThresholdShouldDivideEvenlyAmongPartitions {
-    val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000)
-
-    for (i <- 0 until 50) {
-      consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
-    }
-
-    assertEquals(1000, consumer.perPartitionFetchThreshold)
-  }
-}
\ No newline at end of file