You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/03/25 17:24:53 UTC
git commit: Test fails with this patch. Reverting to fix.
Repository: incubator-samza
Updated Branches:
refs/heads/master 87f19fcd0 -> a62107786
Test fails with this patch. Reverting to fix.
Revert "SAMZA-203; fix changelog restore performance by increasing flushThreshold in kafka system consumer"
This reverts commit 87f19fcd0ba38124fd231ac314d056180643931b.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/a6210778
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/a6210778
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/a6210778
Branch: refs/heads/master
Commit: a6210778617c1cf5e23fbc2338cc01a5f82e465f
Parents: 87f19fc
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Mar 25 09:24:27 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Mar 25 09:24:27 2014 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/KafkaConfig.scala | 2 +-
.../apache/samza/system/kafka/BrokerProxy.scala | 70 +++++++++-----------
.../system/kafka/KafkaSystemConsumer.scala | 43 +++---------
.../samza/system/kafka/KafkaSystemFactory.scala | 4 +-
.../system/kafka/TestKafkaSystemConsumer.scala | 19 ------
5 files changed, 46 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4deabd3..978620a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -40,7 +40,7 @@ object KafkaConfig {
* Defines how low a queue can get for a single system/stream/partition
* combination before trying to fetch more messages for it.
*/
- val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + "samza.fetch.threshold"
+ val CONSUMER_FETCH_THRESHOLD = SystemConfig.SYSTEM_PREFIX + ".samza.fetch.threshold"
implicit def Config2Kafka(config: Config) = new KafkaConfig(config)
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 88817ef..bca2f86 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -38,7 +38,7 @@ import org.apache.samza.util.ExponentialSleepStrategy
* Companion object for class JvmMetrics encapsulating various constants
*/
object BrokerProxy {
- val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
+ val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY"
}
/**
@@ -62,7 +62,7 @@ class BrokerProxy(
/**
* How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
*/
- val sleepMSWhileNoTopicPartitions = 100
+ val sleepMSWhileNoTopicPartitions = 1000
/** What's the next offset for a particular partition? **/
val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
@@ -125,42 +125,36 @@ class BrokerProxy(
val thread = new Thread(new Runnable {
def run {
var reconnect = false
-
- try {
- (new ExponentialSleepStrategy).run(
- loop => {
- if (reconnect) {
- metrics.reconnects(host, port).inc
- simpleConsumer.close()
- simpleConsumer = createSimpleConsumer()
- }
-
- while (!Thread.currentThread.isInterrupted) {
- if (nextOffsets.size == 0) {
- debug("No TopicPartitions to fetch. Sleeping.")
- Thread.sleep(sleepMSWhileNoTopicPartitions)
- } else {
- fetchMessages
-
- // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
- // In that case, reset the loop delay, so that the next time an error occurs,
- // we start with a short retry delay.
- loop.reset
- }
+ (new ExponentialSleepStrategy).run(
+ loop => {
+ if (reconnect) {
+ metrics.reconnects(host, port).inc
+ simpleConsumer.close()
+ simpleConsumer = createSimpleConsumer()
+ }
+
+ while (!Thread.currentThread.isInterrupted) {
+ if (nextOffsets.size == 0) {
+ debug("No TopicPartitions to fetch. Sleeping.")
+ Thread.sleep(sleepMSWhileNoTopicPartitions)
+ } else {
+ fetchMessages
+
+ // If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
+ // In that case, reset the loop delay, so that the next time an error occurs,
+ // we start with a short retry delay.
+ loop.reset
}
- },
-
- (exception, loop) => {
- warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
- debug(exception)
- reconnect = true
- })
- } catch {
- case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
- case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
- }
-
- if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
+ }
+ },
+
+ (exception, loop) => {
+ warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
+ debug(exception)
+ reconnect = true
+ }
+ )
+ if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt")
}
}, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
@@ -269,7 +263,7 @@ class BrokerProxy(
info("Starting " + toString)
thread.setDaemon(true)
- thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+ thread.setName(SAMZA_THREAD_NAME_PREFIX+BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX)
thread.start
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index e1ea2ff..8ad97df 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -34,7 +34,6 @@ import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ExponentialSleepStrategy
-import org.apache.samza.SamzaException
object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -55,24 +54,10 @@ private[kafka] class KafkaSystemConsumer(
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
- fetchSize: Int = ConsumerConfig.MaxFetchSize,
- consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
- consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
-
- /**
- * Defines a low water mark for how many messages we buffer before we start
- * executing fetch requests against brokers to get more messages. This value
- * is divided equally among all registered SystemStreamPartitions. For
- * example, if fetchThreshold is set to 50000, and there are 50
- * SystemStreamPartitions registered, then the per-partition threshold is
- * 1000. As soon as a SystemStreamPartition's buffered message count drops
- * below 1000, a fetch request will be executed to get more data for it.
- *
- * Increasing this parameter will decrease the latency between when a queue
- * is drained of messages and when new messages are enqueued, but also leads
- * to an increase in memory usage since more messages will be held in memory.
- */
- fetchThreshold: Int = 50000,
+ fetchSize:Int = ConsumerConfig.MaxFetchSize,
+ consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
+ consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
+ fetchThreshold: Int = 0,
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
@@ -84,15 +69,8 @@ private[kafka] class KafkaSystemConsumer(
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
var nextOffsets = Map[SystemStreamPartition, String]()
- var perPartitionFetchThreshold = fetchThreshold
def start() {
- if (nextOffsets.size <= 0) {
- throw new SamzaException("No SystemStreamPartitions registered. Must register at least one SystemStreamPartition before starting the consumer.")
- }
-
- perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
-
val topicPartitionsAndOffsets = nextOffsets.map {
case (systemStreamPartition, offset) =>
val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
@@ -128,16 +106,16 @@ private[kafka] class KafkaSystemConsumer(
// addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
// This avoids trying to re-add the same topic partition repeatedly
- def refresh(tp: List[TopicAndPartition]) = {
+ def refresh(tp:List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
val nextOffset = topicPartitionsAndOffsets.get(head).get
// Whatever we do, we can't say Broker, even though we're
// manipulating it here. Broker is a private type and Scala doesn't seem
// to care about that as long as you don't explicitly declare its type.
val brokerOption = partitionMetadata(head.topic)
- .partitionsMetadata
- .find(_.partitionId == head.partition)
- .flatMap(_.leader)
+ .partitionsMetadata
+ .find(_.partitionId == head.partition)
+ .flatMap(_.leader)
brokerOption match {
case Some(broker) =>
@@ -160,7 +138,8 @@ private[kafka] class KafkaSystemConsumer(
(loop, exception) => {
warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
debug(exception)
- })
+ }
+ )
}
val sink = new MessageSink {
@@ -169,7 +148,7 @@ private[kafka] class KafkaSystemConsumer(
}
def needsMoreMessages(tp: TopicAndPartition) = {
- getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold
+ getNumMessagesInQueue(toSystemStreamPartition(tp)) <= fetchThreshold
}
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d6e3a52..feecc58 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -47,9 +47,9 @@ class KafkaSystemFactory extends SystemFactory {
val consumerMaxWait = consumerConfig.fetchWaitMaxMs
val autoOffsetResetDefault = consumerConfig.autoOffsetReset
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
- val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
+ val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("0").toInt
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
-
+
new KafkaSystemConsumer(
systemName = systemName,
brokerListString = brokerListString,
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/a6210778/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
deleted file mode 100644
index 8bd51a1..0000000
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.apache.samza.system.kafka
-
-import org.junit.Test
-import org.junit.Assert._
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.Partition
-
-class TestKafkaSystemConsumer {
- @Test
- def testFetchThresholdShouldDivideEvenlyAmongPartitions {
- val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000)
-
- for (i <- 0 until 50) {
- consumer.register(new SystemStreamPartition("test-system", "test-stream", new Partition(i)), "0")
- }
-
- assertEquals(1000, consumer.perPartitionFetchThreshold)
- }
-}
\ No newline at end of file