You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/06/10 00:59:19 UTC
git commit: SAMZA-144; add stream level overrides for fetch size.
Repository: incubator-samza
Updated Branches:
refs/heads/master c72223f99 -> 868fc7ade
SAMZA-144; add stream level overrides for fetch size.
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/868fc7ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/868fc7ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/868fc7ad
Branch: refs/heads/master
Commit: 868fc7ade3a84a2bbef9970f9cebb9dd956cadd3
Parents: c72223f
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Jun 9 15:59:10 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Jun 9 15:59:10 2014 -0700
----------------------------------------------------------------------
.../org/apache/samza/config/KafkaConfig.scala | 14 ++++++++
.../apache/samza/system/kafka/BrokerProxy.scala | 2 +-
.../kafka/DefaultFetchSimpleConsumer.scala | 16 ++++++---
.../system/kafka/KafkaSystemConsumer.scala | 2 +-
.../samza/system/kafka/KafkaSystemFactory.scala | 3 +-
.../apache/samza/config/TestKafkaConfig.scala | 38 +++++++++++++++++---
.../samza/system/kafka/TestBrokerProxy.scala | 4 +--
7 files changed, 64 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4deabd3..b95e493 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -56,6 +56,20 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
/**
+ * Returns a map of topic -> fetch.message.max.bytes value for all streams that
+ * are defined with this proeprty in thec onfig.
+ */
+ def getFetchMessageMaxBytesTopics(systemName: String) = {
+ val subConf = config.subset("systems.%s.streams." format systemName, true)
+ subConf
+ .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
+ .map {
+ case (fetchMessageMaxBytes, fetchSizeValue) =>
+ (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
+ }.toMap
+ }
+
+ /**
* Returns a map of topic -> auto.offset.reset value for all streams that
* are defined with this property in the config.
*/
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 561e990..f094fa0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -54,7 +54,7 @@ class BrokerProxy(
val messageSink: MessageSink,
val timeout: Int = ConsumerConfig.SocketTimeout,
val bufferSize: Int = ConsumerConfig.SocketBufferSize,
- val fetchSize:Int = ConsumerConfig.FetchSize,
+ val fetchSize: StreamFetchSizes = new StreamFetchSizes,
val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
index d90ca78..5b4886a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
@@ -27,16 +27,16 @@ import kafka.common.TopicAndPartition
import kafka.consumer.ConsumerConfig
class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
- clientId: scala.Predef.String, fetchSize: Int = ConsumerConfig.FetchSize,
- minBytes:Int = ConsumerConfig.MinFetchBytes, maxWait:Int = ConsumerConfig.MaxFetchWaitMs)
+ clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes,
+ minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs)
extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
- def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
+ def defaultFetch(fetches: (TopicAndPartition, Long)*) = {
val fbr = new FetchRequestBuilder().maxWait(maxWait)
.minBytes(minBytes)
.clientId(clientId)
- fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize))
+ fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue)))
this.fetch(fbr.build())
}
@@ -56,3 +56,11 @@ class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soT
override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
}
+/**
+ * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes).
+ * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize.
+ * If stream-level fetch size is not defined, use the default value. The default value is the
+ * Kafka's default fetch size value or the system-level fetch size value (if defined).
+ */
+case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]())
+
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 105f6c6..2163d57 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -58,7 +58,7 @@ private[kafka] class KafkaSystemConsumer(
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
- fetchSize: Int = ConsumerConfig.MaxFetchSize,
+ fetchSize: StreamFetchSizes = new StreamFetchSizes,
consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index f4dc1c1..4ed5e88 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -40,10 +40,9 @@ class KafkaSystemFactory extends SystemFactory {
.getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
- // TODO could add stream-level overrides for timeout and buffer size
val timeout = consumerConfig.socketTimeoutMs
val bufferSize = consumerConfig.socketReceiveBufferBytes
- val fetchSize = consumerConfig.fetchMessageMaxBytes
+ val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName))
val consumerMinSize = consumerConfig.fetchMinBytes
val consumerMaxWait = consumerConfig.fetchWaitMaxMs
val autoOffsetResetDefault = consumerConfig.autoOffsetReset
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 93cf5a5..468aa3d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -26,6 +26,7 @@ import java.io.File
import java.util.Properties
import scala.collection.JavaConversions._
import org.apache.samza.config.factories.PropertiesConfigFactory
+import kafka.consumer.ConsumerConfig
class TestKafkaConfig {
@@ -34,19 +35,19 @@ class TestKafkaConfig {
val factory = new PropertiesConfigFactory()
val props = new Properties
props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
- props.setProperty( "systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
- props.setProperty( "systems.kafka.producer.metadata.broker.list", "localhost:9092")
+ props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
val mapConfig = new MapConfig(props.toMap[String, String])
val kafkaConfig = new KafkaConfig(mapConfig)
-
+
val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
val consumerClientId1 = consumerConfig1.clientId
val groupId1 = consumerConfig1.groupId
val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
val consumerClientId2 = consumerConfig2.clientId
val groupId2 = consumerConfig2.groupId
- assert( consumerClientId1.startsWith("undefined-samza-consumer-"))
+ assert(consumerClientId1.startsWith("undefined-samza-consumer-"))
assert(consumerClientId2.startsWith("undefined-samza-consumer-"))
assert(groupId1.startsWith("undefined-samza-consumer-group-"))
assert(groupId2.startsWith("undefined-samza-consumer-group-"))
@@ -64,7 +65,7 @@ class TestKafkaConfig {
val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig("kafka")
val producerClientId2 = producerConfig2.clientId
- assert( producerClientId1.startsWith("undefined-samza-producer-"))
+ assert(producerClientId1.startsWith("undefined-samza-producer-"))
assert(producerClientId2.startsWith("undefined-samza-producer-"))
assert(producerClientId1 != producerClientId2)
@@ -73,4 +74,31 @@ class TestKafkaConfig {
assert(producerClientId3 == "TestClientId")
}
+
+ @Test
+ def testStreamLevelFetchSizeOverride() {
+ val props = new Properties
+ props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
+ props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+
+ val mapConfig = new MapConfig(props.toMap[String, String])
+ val kafkaConfig = new KafkaConfig(mapConfig)
+ val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
+ // default fetch size
+ assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
+
+ props.setProperty("systems.kafka.consumer.fetch.message.max.bytes", "262144")
+ val mapConfig1 = new MapConfig(props.toMap[String, String])
+ val kafkaConfig1 = new KafkaConfig(mapConfig1)
+ val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig("kafka")
+ // shared fetch size
+ assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
+
+ props.setProperty("systems.kafka.streams.topic1.consumer.fetch.message.max.bytes", "65536")
+ val mapConfig2 = new MapConfig(props.toMap[String, String])
+ val kafkaConfig2 = new KafkaConfig(mapConfig2)
+ val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics("kafka")
+ // topic fetch size
+ assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 9c0ca60..b4e7178 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -86,7 +86,7 @@ class TestBrokerProxy extends Logging {
}
alreadyCreatedConsumer = true
- new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) {
+ new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) {
val sc = Mockito.mock(classOf[SimpleConsumer])
val mockOffsetResponse = {
val offsetResponse = Mockito.mock(classOf[OffsetResponse])
@@ -253,7 +253,7 @@ class TestBrokerProxy extends Logging {
// So now we have a fetch response that will fail. Prime the mockGetOffset to send us to a new offset
- val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) {
+ val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
override def createSimpleConsumer() = {
if(callsToCreateSimpleConsumer > 1) {