You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/16 01:01:11 UTC

git commit: SAMZA-233; start new broker proxies when abdicating

Repository: incubator-samza
Updated Branches:
  refs/heads/master 80bf78204 -> 74232e2f4


SAMZA-233; start new broker proxies when abdicating


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/74232e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/74232e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/74232e2f

Branch: refs/heads/master
Commit: 74232e2f4fd0551451bf4c3acfe9109c61f4e611
Parents: 80bf782
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 15 15:55:24 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 15 15:55:24 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 10 ++--
 .../system/kafka/KafkaSystemConsumer.scala      | 53 ++++++++++++--------
 .../samza/system/kafka/KafkaSystemFactory.scala |  3 ++
 .../system/kafka/TestKafkaSystemConsumer.scala  | 53 +++++++++++++++++++-
 4 files changed, 93 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 88817ef..e08791f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -268,9 +268,13 @@ class BrokerProxy(
   def start {
     info("Starting " + toString)
 
-    thread.setDaemon(true)
-    thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
-    thread.start
+    if (!thread.isAlive) {
+      thread.setDaemon(true)
+      thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+      thread.start
+    } else {
+      debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
+    }
   }
 
   def stop {

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 511306f..1825fbb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -19,7 +19,6 @@
 
 package org.apache.samza.system.kafka
 
-import org.apache.samza.util.ClientUtilTopicMetadataStore
 import kafka.common.TopicAndPartition
 import grizzled.slf4j.Logging
 import kafka.message.MessageAndOffset
@@ -35,6 +34,10 @@ import org.apache.samza.system.IncomingMessageEnvelope
 import kafka.consumer.ConsumerConfig
 import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.SamzaException
+import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
+import kafka.api.TopicMetadata
+import org.apache.samza.util.ExponentialSleepStrategy
 
 object KafkaSystemConsumer {
   def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -52,6 +55,7 @@ private[kafka] class KafkaSystemConsumer(
   systemName: String,
   brokerListString: String,
   metrics: KafkaSystemConsumerMetrics,
+  metadataStore: TopicMetadataStore,
   clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
   timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
   bufferSize: Int = ConsumerConfig.SocketBufferSize,
@@ -98,8 +102,6 @@ private[kafka] class KafkaSystemConsumer(
     }
 
     refreshBrokers(topicPartitionsAndOffsets)
-
-    brokerProxies.values.foreach(_.start)
   }
 
   override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
@@ -114,36 +116,42 @@ private[kafka] class KafkaSystemConsumer(
     brokerProxies.values.foreach(_.stop)
   }
 
+  protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+    new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
+  }
+
+  protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+    // Whatever we do, we can't say Broker, even though we're
+    // manipulating it here. Broker is a private type and Scala doesn't seem
+    // to care about that as long as you don't explicitly declare its type.
+    val brokerOption = topicMetadata
+      .partitionsMetadata
+      .find(_.partitionId == partition)
+      .flatMap(_.leader)
+
+    brokerOption match {
+      case Some(broker) => Some(broker.host, broker.port)
+      case _ => None
+    }
+  }
+
   def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
     var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
     retryBackoff.run(
       loop => {
-        val getTopicMetadata = (topics: Set[String]) => {
-          new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout).getTopicInfo(topics)
-        }
         val topics = tpToRefresh.map(_.topic).toSet
-        val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, getTopicMetadata)
+        val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
 
         // addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
         // This avoids trying to re-add the same topic partition repeatedly
         def refresh(tp: List[TopicAndPartition]) = {
           val head :: rest = tpToRefresh
           val nextOffset = topicPartitionsAndOffsets.get(head).get
-          // Whatever we do, we can't say Broker, even though we're
-          // manipulating it here. Broker is a private type and Scala doesn't seem
-          // to care about that as long as you don't explicitly declare its type.
-          val brokerOption = partitionMetadata(head.topic)
-            .partitionsMetadata
-            .find(_.partitionId == head.partition)
-            .flatMap(_.leader)
-
-          brokerOption match {
-            case Some(broker) =>
-              def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
-
-              brokerProxies
-                .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)
-                .addTopicPartition(head, Option(nextOffset))
+          getHostPort(topicMetadata(head.topic), head.partition) match {
+            case Some((host, port)) =>
+              val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
+              brokerProxy.addTopicPartition(head, Option(nextOffset))
+              brokerProxy.start
             case None => warn("No such topic-partition: %s, dropping." format head)
           }
           rest
@@ -152,6 +160,7 @@ private[kafka] class KafkaSystemConsumer(
         while (!tpToRefresh.isEmpty) {
           tpToRefresh = refresh(tpToRefresh)
         }
+
         loop.done
       },
 

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d6e3a52..a5e8614 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -27,6 +27,7 @@ import org.apache.samza.SamzaException
 import kafka.producer.Producer
 import org.apache.samza.system.SystemFactory
 import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.ClientUtilTopicMetadataStore
 
 class KafkaSystemFactory extends SystemFactory {
   def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
@@ -49,11 +50,13 @@ class KafkaSystemFactory extends SystemFactory {
     val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
     val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
     val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
+    val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
 
     new KafkaSystemConsumer(
       systemName = systemName,
       brokerListString = brokerListString,
       metrics = metrics,
+      metadataStore = metadataStore,
       clientId = clientId,
       timeout = timeout,
       bufferSize = bufferSize,

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 93766cd..e05fd0b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -24,11 +24,16 @@ import org.junit.Assert._
 import org.apache.samza.system.SystemStreamPartition
 import org.apache.samza.Partition
 import kafka.common.TopicAndPartition
+import org.apache.samza.util.TopicMetadataStore
+import kafka.api.TopicMetadata
+import kafka.api.PartitionMetadata
+import kafka.cluster.Broker
 
 class TestKafkaSystemConsumer {
   @Test
   def testFetchThresholdShouldDivideEvenlyAmongPartitions {
-    val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000) {
+    val metadataStore = new MockMetadataStore
+    val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
       override def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
       }
     }
@@ -41,4 +46,50 @@ class TestKafkaSystemConsumer {
 
     assertEquals(1000, consumer.perPartitionFetchThreshold)
   }
+
+  @Test
+  def testBrokerCreationShouldTriggerStart {
+    val systemName = "test-system"
+    val streamName = "test-stream"
+    val metrics = new KafkaSystemConsumerMetrics
+    // Lie and tell the store that the partition metadata is empty. We can't 
+    // use partition metadata because it has Broker in its constructor, which 
+    // is package private to Kafka. 
+    val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
+    var hosts = List[String]()
+    var getHostPortCount = 0
+    val consumer = new KafkaSystemConsumer(systemName, streamName, metrics, metadataStore) {
+      override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+        // Generate a unique host every time getHostPort is called.
+        getHostPortCount += 1
+        Some("localhost-%s" format getHostPortCount, 0)
+      }
+
+      override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+        new BrokerProxy(host, port, systemName, "", metrics, sink) {
+          override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
+            // Skip this since we normally do verification of offsets, which 
+            // tries to connect to Kafka. Rather than mock that, just forget it.
+            nextOffsets.size
+          }
+
+          override def start {
+            hosts :+= host
+          }
+        }
+      }
+    }
+
+    consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
+    assertEquals(0, hosts.size)
+    consumer.start
+    assertEquals(List("localhost-1"), hosts)
+    // Should trigger a refresh with a new host.
+    consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
+    assertEquals(List("localhost-1", "localhost-2"), hosts)
+  }
+}
+
+class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
+  def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
 }
\ No newline at end of file