You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:38 UTC

[22/50] [abbrv] samza git commit: SAMZA-753 - BrokerProxy stop should stop the Kafka consumer first

SAMZA-753 - BrokerProxy stop should stop the Kafka consumer first


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74aa516f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74aa516f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74aa516f

Branch: refs/heads/samza-sql
Commit: 74aa516fd84f510cd9e92e9bff90e480845763d2
Parents: bc7a07a
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Nov 17 16:58:07 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Tue Nov 17 17:06:01 2015 -0800

----------------------------------------------------------------------
 .../org/apache/samza/system/kafka/BrokerProxy.scala    |  5 +++++
 .../apache/samza/system/kafka/TestBrokerProxy.scala    | 13 +++++++++++++
 2 files changed, 18 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index c8cbc38..9aa9818 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -303,6 +303,11 @@ class BrokerProxy(
   def stop {
     info("Shutting down " + toString)
 
+    if (simpleConsumer != null) {
+      info("closing simple consumer...")
+      simpleConsumer.close
+    }
+
     thread.interrupt
     thread.join
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 170318e..cc7077c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -417,4 +417,17 @@ class TestBrokerProxy extends Logging {
     }
     assertEquals(true, caughtError)
   }
+
+  @Test
+	def brokerProxyStopCloseConsumer: Unit = {
+    val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+    val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){
+      override def createSimpleConsumer() = {
+        mockSimpleConsumer
+      }
+    }
+    bp.start
+    bp.stop
+    verify(mockSimpleConsumer).close
+  }
 }