You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/04/16 01:01:11 UTC
git commit: SAMZA-233; start new broker proxies when abdicating
Repository: incubator-samza
Updated Branches:
refs/heads/master 80bf78204 -> 74232e2f4
SAMZA-233; start new broker proxies when abdicating
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/74232e2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/74232e2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/74232e2f
Branch: refs/heads/master
Commit: 74232e2f4fd0551451bf4c3acfe9109c61f4e611
Parents: 80bf782
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Apr 15 15:55:24 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Apr 15 15:55:24 2014 -0700
----------------------------------------------------------------------
.../apache/samza/system/kafka/BrokerProxy.scala | 10 ++--
.../system/kafka/KafkaSystemConsumer.scala | 53 ++++++++++++--------
.../samza/system/kafka/KafkaSystemFactory.scala | 3 ++
.../system/kafka/TestKafkaSystemConsumer.scala | 53 +++++++++++++++++++-
4 files changed, 93 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 88817ef..e08791f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -268,9 +268,13 @@ class BrokerProxy(
def start {
info("Starting " + toString)
- thread.setDaemon(true)
- thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
- thread.start
+ if (!thread.isAlive) {
+ thread.setDaemon(true)
+ thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
+ thread.start
+ } else {
+ debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
+ }
}
def stop {
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index 511306f..1825fbb 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -19,7 +19,6 @@
package org.apache.samza.system.kafka
-import org.apache.samza.util.ClientUtilTopicMetadataStore
import kafka.common.TopicAndPartition
import grizzled.slf4j.Logging
import kafka.message.MessageAndOffset
@@ -35,6 +34,10 @@ import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.SamzaException
+import org.apache.samza.util.TopicMetadataStore
+import org.apache.samza.util.ExponentialSleepStrategy
+import kafka.api.TopicMetadata
+import org.apache.samza.util.ExponentialSleepStrategy
object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
@@ -52,6 +55,7 @@ private[kafka] class KafkaSystemConsumer(
systemName: String,
brokerListString: String,
metrics: KafkaSystemConsumerMetrics,
+ metadataStore: TopicMetadataStore,
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
@@ -98,8 +102,6 @@ private[kafka] class KafkaSystemConsumer(
}
refreshBrokers(topicPartitionsAndOffsets)
-
- brokerProxies.values.foreach(_.start)
}
override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
@@ -114,36 +116,42 @@ private[kafka] class KafkaSystemConsumer(
brokerProxies.values.foreach(_.stop)
}
+ protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+ new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
+ }
+
+ protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+ // Whatever we do, we can't say Broker, even though we're
+ // manipulating it here. Broker is a private type and Scala doesn't seem
+ // to care about that as long as you don't explicitly declare its type.
+ val brokerOption = topicMetadata
+ .partitionsMetadata
+ .find(_.partitionId == partition)
+ .flatMap(_.leader)
+
+ brokerOption match {
+ case Some(broker) => Some(broker.host, broker.port)
+ case _ => None
+ }
+ }
+
def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
retryBackoff.run(
loop => {
- val getTopicMetadata = (topics: Set[String]) => {
- new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout).getTopicInfo(topics)
- }
val topics = tpToRefresh.map(_.topic).toSet
- val partitionMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, getTopicMetadata)
+ val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
// addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
// This avoids trying to re-add the same topic partition repeatedly
def refresh(tp: List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
val nextOffset = topicPartitionsAndOffsets.get(head).get
- // Whatever we do, we can't say Broker, even though we're
- // manipulating it here. Broker is a private type and Scala doesn't seem
- // to care about that as long as you don't explicitly declare its type.
- val brokerOption = partitionMetadata(head.topic)
- .partitionsMetadata
- .find(_.partitionId == head.partition)
- .flatMap(_.leader)
-
- brokerOption match {
- case Some(broker) =>
- def createBrokerProxy = new BrokerProxy(broker.host, broker.port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
-
- brokerProxies
- .getOrElseUpdate((broker.host, broker.port), createBrokerProxy)
- .addTopicPartition(head, Option(nextOffset))
+ getHostPort(topicMetadata(head.topic), head.partition) match {
+ case Some((host, port)) =>
+ val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
+ brokerProxy.addTopicPartition(head, Option(nextOffset))
+ brokerProxy.start
case None => warn("No such topic-partition: %s, dropping." format head)
}
rest
@@ -152,6 +160,7 @@ private[kafka] class KafkaSystemConsumer(
while (!tpToRefresh.isEmpty) {
tpToRefresh = refresh(tpToRefresh)
}
+
loop.done
},
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index d6e3a52..a5e8614 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -27,6 +27,7 @@ import org.apache.samza.SamzaException
import kafka.producer.Producer
import org.apache.samza.system.SystemFactory
import org.apache.samza.util.ExponentialSleepStrategy
+import org.apache.samza.util.ClientUtilTopicMetadataStore
class KafkaSystemFactory extends SystemFactory {
def getConsumer(systemName: String, config: Config, registry: MetricsRegistry) = {
@@ -49,11 +50,13 @@ class KafkaSystemFactory extends SystemFactory {
val autoOffsetResetTopics = config.getAutoOffsetResetTopics(systemName)
val fetchThreshold = config.getConsumerFetchThreshold(systemName).getOrElse("50000").toInt
val offsetGetter = new GetOffset(autoOffsetResetDefault, autoOffsetResetTopics)
+ val metadataStore = new ClientUtilTopicMetadataStore(brokerListString, clientId, timeout)
new KafkaSystemConsumer(
systemName = systemName,
brokerListString = brokerListString,
metrics = metrics,
+ metadataStore = metadataStore,
clientId = clientId,
timeout = timeout,
bufferSize = bufferSize,
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/74232e2f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
index 93766cd..e05fd0b 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemConsumer.scala
@@ -24,11 +24,16 @@ import org.junit.Assert._
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.Partition
import kafka.common.TopicAndPartition
+import org.apache.samza.util.TopicMetadataStore
+import kafka.api.TopicMetadata
+import kafka.api.PartitionMetadata
+import kafka.cluster.Broker
class TestKafkaSystemConsumer {
@Test
def testFetchThresholdShouldDivideEvenlyAmongPartitions {
- val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, fetchThreshold = 50000) {
+ val metadataStore = new MockMetadataStore
+ val consumer = new KafkaSystemConsumer("", "", new KafkaSystemConsumerMetrics, metadataStore, fetchThreshold = 50000) {
override def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
}
}
@@ -41,4 +46,50 @@ class TestKafkaSystemConsumer {
assertEquals(1000, consumer.perPartitionFetchThreshold)
}
+
+ @Test
+ def testBrokerCreationShouldTriggerStart {
+ val systemName = "test-system"
+ val streamName = "test-stream"
+ val metrics = new KafkaSystemConsumerMetrics
+ // Lie and tell the store that the partition metadata is empty. We can't
+ // use partition metadata because it has Broker in its constructor, which
+ // is package private to Kafka.
+ val metadataStore = new MockMetadataStore(Map(streamName -> TopicMetadata(streamName, Seq.empty, 0)))
+ var hosts = List[String]()
+ var getHostPortCount = 0
+ val consumer = new KafkaSystemConsumer(systemName, streamName, metrics, metadataStore) {
+ override def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
+ // Generate a unique host every time getHostPort is called.
+ getHostPortCount += 1
+ Some("localhost-%s" format getHostPortCount, 0)
+ }
+
+ override def createBrokerProxy(host: String, port: Int): BrokerProxy = {
+ new BrokerProxy(host, port, systemName, "", metrics, sink) {
+ override def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
+ // Skip this since we normally do verification of offsets, which
+ // tries to connect to Kafka. Rather than mock that, just forget it.
+ nextOffsets.size
+ }
+
+ override def start {
+ hosts :+= host
+ }
+ }
+ }
+ }
+
+ consumer.register(new SystemStreamPartition(systemName, streamName, new Partition(0)), "1")
+ assertEquals(0, hosts.size)
+ consumer.start
+ assertEquals(List("localhost-1"), hosts)
+ // Should trigger a refresh with a new host.
+ consumer.sink.abdicate(new TopicAndPartition(streamName, 0), 2)
+ assertEquals(List("localhost-1", "localhost-2"), hosts)
+ }
+}
+
+class MockMetadataStore(var metadata: Map[String, TopicMetadata] = Map()) extends TopicMetadataStore {
+ def getTopicInfo(topics: Set[String]): Map[String, TopicMetadata] = metadata
}
\ No newline at end of file