You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/11 17:29:11 UTC

samza git commit: SAMZA-590; kafka broker proxy should relinquish ownership for all partitions when its consumer fails

Repository: samza
Updated Branches:
  refs/heads/master b82d45871 -> c725b3ceb


SAMZA-590; kafka broker proxy should relinquish ownership for all partitions when its consumer fails


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c725b3ce
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c725b3ce
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c725b3ce

Branch: refs/heads/master
Commit: c725b3ceb6d2e116f5da257ba45fe70fa877747a
Parents: b82d458
Author: Chris Riccomini <cr...@apache.org>
Authored: Wed Mar 11 09:28:58 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 11 09:28:58 2015 -0700

----------------------------------------------------------------------
 .../apache/samza/system/kafka/BrokerProxy.scala | 28 +++++++---
 .../samza/system/kafka/TestBrokerProxy.scala    | 59 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index f768263..c6e231a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -153,8 +153,9 @@ class BrokerProxy(
           },
 
           (exception, loop) => {
-            warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
+            warn("Restarting consumer due to %s. Releasing ownership of all partitions, and restarting consumer. Turn on debugging to get a full stack trace." format exception)
             debug("Exception detail:", exception)
+            abdicateAll
             reconnect = true
           })
       } catch {
@@ -182,7 +183,6 @@ class BrokerProxy(
 
       nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
     } else {
-
       refreshLatencyMetrics
 
       debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
@@ -193,13 +193,27 @@ class BrokerProxy(
     }
   }
 
-  def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
+  /**
+   * Releases ownership for a single TopicAndPartition. The 
+   * KafkaSystemConsumer will try and find a new broker for the 
+   * TopicAndPartition.
+   */
+  def abdicate(tp: TopicAndPartition) = removeTopicPartition(tp) match {
     // Need to be mindful of a tp that was removed by another thread
-    def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match {
-        case Some(offset) => messageSink.abdicate(tp, offset)
-        case None         => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
-      }
+    case Some(offset) => messageSink.abdicate(tp, offset)
+    case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
+  }
 
+  /**
+   * Releases all TopicAndPartition ownership for this BrokerProxy thread. The 
+   * KafkaSystemConsumer will try and find a new broker for the 
+   * TopicAndPartition.
+   */
+  def abdicateAll {
+    nextOffsets.keySet.foreach(abdicate(_))
+  }
+
+  def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
     // FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
     case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
 

http://git-wip-us.apache.org/repos/asf/samza/blob/c725b3ce/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index d559d8b..e285dec 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -21,6 +21,7 @@
 package org.apache.samza.system.kafka
 
 import java.nio.ByteBuffer
+import java.nio.channels.ClosedChannelException
 import java.util.concurrent.CountDownLatch
 
 import kafka.api._
@@ -299,4 +300,62 @@ class TestBrokerProxy extends Logging {
       fail(failString)
     }
   }
+
+  /**
+   * Test that makes sure that BrokerProxy abdicates all TopicAndPartitions 
+   * that it owns when a consumer failure occurs.
+   */
+  @Test def brokerProxyAbdicatesOnConnectionFailure():Unit = {
+    val countdownLatch = new CountDownLatch(1)
+    var abdicated: Option[TopicAndPartition] = None
+    @volatile var refreshDroppedCount = 0
+    val mockMessageSink = new MessageSink {
+      override def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
+      }
+      override def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) {
+      }
+      override def abdicate(tp: TopicAndPartition, nextOffset: Long) {
+        abdicated = Some(tp)
+        countdownLatch.countDown
+      }
+      override def refreshDropped() {
+        refreshDroppedCount += 1
+      }
+      override def needsMoreMessages(tp: TopicAndPartition): Boolean = {
+        true
+      }
+    }
+
+    val doNothingMetrics = new KafkaSystemConsumerMetrics()
+    val tp = new TopicAndPartition("topic", 42)
+    val mockOffsetGetter = mock(classOf[GetOffset])
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+
+    when(mockOffsetGetter.isValidOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp), Matchers.eq("0"))).thenReturn(true)
+    when(mockOffsetGetter.getResetOffset(any(classOf[DefaultFetchSimpleConsumer]), Matchers.eq(tp))).thenReturn(1492l)
+    when(mockSimpleConsumer.defaultFetch(any())).thenThrow(new SamzaException("Pretend this is a ClosedChannelException. Can't use ClosedChannelException because it's checked, and Mockito doesn't like that."))
+
+    val bp = new BrokerProxy("host", 567, "system", "clientID", doNothingMetrics, mockMessageSink, Int.MaxValue, 1024000, new StreamFetchSizes(256 * 1024), 524288, 1000, mockOffsetGetter) {
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+
+    val waitForRefresh = () => {
+      val currentRefreshDroppedCount = refreshDroppedCount
+      while (refreshDroppedCount == currentRefreshDroppedCount) {
+        Thread.sleep(100)
+      }
+    }
+
+    bp.addTopicPartition(tp, Option("0"))
+    bp.start
+    // BP should refresh on startup.
+    waitForRefresh()
+    countdownLatch.await()
+    // BP should continue refreshing after it's abdicated all TopicAndPartitions.
+    waitForRefresh()
+    bp.stop
+    assertEquals(tp, abdicated.getOrElse(null))
+  }
 }