You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/11 17:29:11 UTC
samza git commit: SAMZA-590;
kafka broker proxy should relinquish ownership for all partitions
when its consumer fails
Repository: samza
Updated Branches:
refs/heads/master b82d45871 -> c725b3ceb
SAMZA-590; kafka broker proxy should relinquish ownership for all partitions when its consumer fails
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c725b3ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c725b3ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c725b3ce
Branch: refs/heads/master
Commit: c725b3ceb6d2e116f5da257ba45fe70fa877747a
Parents: b82d458
Author: Chris Riccomini <cr...@apache.org>
Authored: Wed Mar 11 09:28:58 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 11 09:28:58 2015 -0700
----------------------------------------------------------------------
.../apache/samza/system/kafka/BrokerProxy.scala | 28 +++++++---
.../samza/system/kafka/TestBrokerProxy.scala | 59 ++++++++++++++++++++
2 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index f768263..c6e231a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -153,8 +153,9 @@ class BrokerProxy(
},
(exception, loop) => {
- warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
+ warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
debug("Exception detail:", exception)
+ abdicateAll
reconnect = true
})
} catch {
@@ -182,7 +183,6 @@ class BrokerProxy(
nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
} else {
-
refreshLatencyMetrics
debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
@@ -193,13 +193,27 @@ class BrokerProxy(
}
}
- def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
+ /**
+ * Releases ownership for a single TopicAndPartition. The
+ * KafkaSystemConsumer will try and find a new broker for the
+ * TopicAndPartition.
+ */
+ def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
// Need to be mindful of a tp that was removed by another thread
- def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match {
- case Some(offset) => messageSink.abdicate(tp, offset)
- case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
- }
+ case Some(offset) => messageSink.abdicate(tp, offset)
+ case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
+ }
+ /**
+ * Releases all TopicAndPartition ownership for this BrokerProxy thread. The
+ * KafkaSystemConsumer will try and find a new broker for the
+ * TopicAndPartition.
+ */
+ def abdicateAll {
+ nextOffsets.keySet.foreach(abdicate(_))
+ }
+
+ def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
// FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index d559d8b..e285dec 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -21,6 +21,7 @@
package org.apache.samza.system.kafka
import java.nio.ByteBuffer
+import java.nio.channels.ClosedChannelException
import java.util.concurrent.CountDownLatch
import kafka.api._
@@ -299,4 +300,62 @@ class TestBrokerProxy extends Logging {
fail(failString)
}
}
+
+ /**
+ * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions
+ * that it owns when a consumer failure occurs.
+ */
+ @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = {
+ val countdownLatch = new CountDownLatch(1)
+ var abdicated: Option[TopicAndPartition] = None
+ @volatile var refreshDroppedCount = 0
+ val mockMessageSink = new MessageSink {
+ override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+ }
+ override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+ }
+ override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
+ abdicated = Some(tp)
+ countdownLatch.countDown
+ }
+ override def refreshDropped() {
+ refreshDroppedCount += 1
+ }
+ override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
+ true
+ }
+ }
+
+ val doNothingMetrics = new KafkaSystemConsumerMetrics()
+ val tp = new TopicAndPartition("topic", 42)
+ val mockOffsetGetter = mock(classOf[GetOffset])
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+ when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+ when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+ when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that."))
+
+ val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+
+ val waitForRefresh = () => {
+ val currentRefreshDroppedCount = refreshDroppedCount
+ while (refreshDroppedCount == currentRefreshDroppedCount) {
+ Thread.sleep(100)
+ }
+ }
+
+ bp.addTopicPartition(tp, Option("0"))
+ bp.start
+ // BP should refresh on startup.
+ waitForRefresh()
+ countdownLatch.await()
+ // BP should continue refreshing after it's abdicated all TopicAndPartitions.
+ waitForRefresh()
+ bp.stop
+ assertEquals(tp, abdicated.getOrElse(null))
+ }
}