You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/06/10 00:59:19 UTC

git commit: SAMZA-144; add stream level overrides for fetch size.

Repository: incubator-samza
Updated Branches:
  refs/heads/master c72223f99 -> 868fc7ade


SAMZA-144; add stream level overrides for fetch size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/868fc7ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/868fc7ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/868fc7ad

Branch: refs/heads/master
Commit: 868fc7ade3a84a2bbef9970f9cebb9dd956cadd3
Parents: c72223f
Author: Yan Fang <ya...@gmail.com>
Authored: Mon Jun 9 15:59:10 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Mon Jun 9 15:59:10 2014 -0700

----------------------------------------------------------------------
 .../org/apache/samza/config/KafkaConfig.scala   | 14 ++++++++
 .../apache/samza/system/kafka/BrokerProxy.scala |  2 +-
 .../kafka/DefaultFetchSimpleConsumer.scala      | 16 ++++++---
 .../system/kafka/KafkaSystemConsumer.scala      |  2 +-
 .../samza/system/kafka/KafkaSystemFactory.scala |  3 +-
 .../apache/samza/config/TestKafkaConfig.scala   | 38 +++++++++++++++++---
 .../samza/system/kafka/TestBrokerProxy.scala    |  4 +--
 7 files changed, 64 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 4deabd3..b95e493 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -56,6 +56,20 @@ class KafkaConfig(config: Config) extends ScalaMapConfig(config) {
   def getConsumerFetchThreshold(name: String) = getOption(KafkaConfig.CONSUMER_FETCH_THRESHOLD format name)
 
   /**
+   * Returns a map of topic -> fetch.message.max.bytes value for all streams that
+   * are defined with this proeprty in thec onfig.
+   */
+  def getFetchMessageMaxBytesTopics(systemName: String) = {
+    val subConf = config.subset("systems.%s.streams." format systemName, true)
+    subConf
+      .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
+      .map {
+        case (fetchMessageMaxBytes, fetchSizeValue) =>
+          (fetchMessageMaxBytes.replace(".consumer.fetch.message.max.bytes", ""), fetchSizeValue.toInt)
+      }.toMap
+  }
+
+  /**
    * Returns a map of topic -> auto.offset.reset value for all streams that
    * are defined with this property in the config.
    */

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 561e990..f094fa0 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -54,7 +54,7 @@ class BrokerProxy(
   val messageSink: MessageSink,
   val timeout: Int = ConsumerConfig.SocketTimeout,
   val bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  val fetchSize:Int = ConsumerConfig.FetchSize,
+  val fetchSize: StreamFetchSizes = new StreamFetchSizes,
   val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
   val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
   offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
index d90ca78..5b4886a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/DefaultFetchSimpleConsumer.scala
@@ -27,16 +27,16 @@ import kafka.common.TopicAndPartition
 import kafka.consumer.ConsumerConfig
 
 class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soTimeout: scala.Int, bufferSize: scala.Int,
-                                 clientId: scala.Predef.String, fetchSize: Int = ConsumerConfig.FetchSize,
-                                 minBytes:Int = ConsumerConfig.MinFetchBytes, maxWait:Int = ConsumerConfig.MaxFetchWaitMs)
+  clientId: scala.Predef.String, fetchSize: StreamFetchSizes = new StreamFetchSizes,
+  minBytes: Int = ConsumerConfig.MinFetchBytes, maxWait: Int = ConsumerConfig.MaxFetchWaitMs)
   extends SimpleConsumer(host, port, soTimeout, bufferSize, clientId) {
 
-  def defaultFetch(fetches:(TopicAndPartition, Long)*) = {
+  def defaultFetch(fetches: (TopicAndPartition, Long)*) = {
     val fbr = new FetchRequestBuilder().maxWait(maxWait)
       .minBytes(minBytes)
       .clientId(clientId)
 
-    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize))
+    fetches.foreach(f => fbr.addFetch(f._1.topic, f._1.partition, f._2, fetchSize.streamValue.getOrElse(f._1.topic, fetchSize.defaultValue)))
 
     this.fetch(fbr.build())
   }
@@ -56,3 +56,11 @@ class DefaultFetchSimpleConsumer(host: scala.Predef.String, port: scala.Int, soT
   override def earliestOrLatestOffset(topicAndPartition: TopicAndPartition, earliestOrLatest: Long, consumerId: Int): Long = super.earliestOrLatestOffset(topicAndPartition, earliestOrLatest, consumerId)
 }
 
+/**
+ * a simple class for holding values for the stream's fetch size (fetch.message.max.bytes).
+ * The stream-level fetch size values are put in the streamValue map streamName -> fetchSize.
+ * If stream-level fetch size is not defined, use the default value. The default value is the
+ * Kafka's default fetch size value or the system-level fetch size value (if defined).
+ */
+case class StreamFetchSizes(defaultValue: Int = ConsumerConfig.MaxFetchSize, streamValue: Map[String, Int] = Map[String, Int]())
+

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 105f6c6..2163d57 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -58,7 +58,7 @@ private[kafka] class KafkaSystemConsumer(
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
   timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
-  fetchSize: Int = ConsumerConfig.MaxFetchSize,
+  fetchSize: StreamFetchSizes = new StreamFetchSizes,
   consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
   consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index f4dc1c1..4ed5e88 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -40,10 +40,9 @@ class KafkaSystemFactory extends SystemFactory {
       .getOrElse(throw new SamzaException("No broker list defined in config for %s." format systemName))
     val consumerConfig = config.getKafkaSystemConsumerConfig(systemName, clientId)
 
-    // TODO could add stream-level overrides for timeout and buffer size
     val timeout = consumerConfig.socketTimeoutMs
     val bufferSize = consumerConfig.socketReceiveBufferBytes
-    val fetchSize = consumerConfig.fetchMessageMaxBytes
+    val fetchSize = new StreamFetchSizes(consumerConfig.fetchMessageMaxBytes, config.getFetchMessageMaxBytesTopics(systemName))
     val consumerMinSize = consumerConfig.fetchMinBytes
     val consumerMaxWait = consumerConfig.fetchWaitMaxMs
     val autoOffsetResetDefault = consumerConfig.autoOffsetReset

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index 93cf5a5..468aa3d 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -26,6 +26,7 @@ import java.io.File
 import java.util.Properties
 import scala.collection.JavaConversions._
 import org.apache.samza.config.factories.PropertiesConfigFactory
+import kafka.consumer.ConsumerConfig
 
 class TestKafkaConfig {
 
@@ -34,19 +35,19 @@ class TestKafkaConfig {
     val factory = new PropertiesConfigFactory()
     val props = new Properties
     props.setProperty(" systems.kafka.samza.factory", "org.apache.samza.system.kafka.KafkaSystemFactory")
-    props.setProperty( "systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
-    props.setProperty( "systems.kafka.producer.metadata.broker.list", "localhost:9092")
+    props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
 
     val mapConfig = new MapConfig(props.toMap[String, String])
     val kafkaConfig = new KafkaConfig(mapConfig)
-  
+
     val consumerConfig1 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
     val consumerClientId1 = consumerConfig1.clientId
     val groupId1 = consumerConfig1.groupId
     val consumerConfig2 = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
     val consumerClientId2 = consumerConfig2.clientId
     val groupId2 = consumerConfig2.groupId
-    assert( consumerClientId1.startsWith("undefined-samza-consumer-"))
+    assert(consumerClientId1.startsWith("undefined-samza-consumer-"))
     assert(consumerClientId2.startsWith("undefined-samza-consumer-"))
     assert(groupId1.startsWith("undefined-samza-consumer-group-"))
     assert(groupId2.startsWith("undefined-samza-consumer-group-"))
@@ -64,7 +65,7 @@ class TestKafkaConfig {
     val producerConfig2 = kafkaConfig.getKafkaSystemProducerConfig("kafka")
     val producerClientId2 = producerConfig2.clientId
 
-    assert( producerClientId1.startsWith("undefined-samza-producer-"))
+    assert(producerClientId1.startsWith("undefined-samza-producer-"))
     assert(producerClientId2.startsWith("undefined-samza-producer-"))
     assert(producerClientId1 != producerClientId2)
 
@@ -73,4 +74,31 @@ class TestKafkaConfig {
     assert(producerClientId3 == "TestClientId")
 
   }
+
+  @Test
+  def testStreamLevelFetchSizeOverride() {
+    val props = new Properties
+    props.setProperty("systems.kafka.consumer.zookeeper.connect", "localhost:2181/")
+    props.setProperty("systems.kafka.producer.metadata.broker.list", "localhost:9092")
+
+    val mapConfig = new MapConfig(props.toMap[String, String])
+    val kafkaConfig = new KafkaConfig(mapConfig)
+    val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig("kafka")
+    // default fetch size
+    assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
+
+    props.setProperty("systems.kafka.consumer.fetch.message.max.bytes", "262144")
+    val mapConfig1 = new MapConfig(props.toMap[String, String])
+    val kafkaConfig1 = new KafkaConfig(mapConfig1)
+    val consumerConfig1 = kafkaConfig1.getKafkaSystemConsumerConfig("kafka")
+    // shared fetch size
+    assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
+    
+    props.setProperty("systems.kafka.streams.topic1.consumer.fetch.message.max.bytes", "65536")
+    val mapConfig2 = new MapConfig(props.toMap[String, String])
+    val kafkaConfig2 = new KafkaConfig(mapConfig2)
+    val consumerConfig2 = kafkaConfig2.getFetchMessageMaxBytesTopics("kafka")
+    // topic fetch size
+    assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/868fc7ad/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 9c0ca60..b4e7178 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -86,7 +86,7 @@ class TestBrokerProxy extends Logging {
         }
         alreadyCreatedConsumer = true
 
-        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", 42) {
+        new DefaultFetchSimpleConsumer("a", 1, 2, 3, "b", new StreamFetchSizes(42)) {
           val sc = Mockito.mock(classOf[SimpleConsumer])
           val mockOffsetResponse = {
             val offsetResponse = Mockito.mock(classOf[OffsetResponse])
@@ -253,7 +253,7 @@ class TestBrokerProxy extends Logging {
 
     // So now we have a fetch response that will fail.  Prime the mockGetOffset to send us to a new offset
 
-    val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, 256 * 1024, 524288, 1000, mockOffsetGetter) {
+    val bp = new BrokerProxy("host", 423, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
 
       override def createSimpleConsumer() = {
         if(callsToCreateSimpleConsumer > 1) {