You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2016/02/02 01:35:38 UTC
[22/50] [abbrv] samza git commit: SAMZA-753 - BrokerProxy stop should
stop the Kafka consumer first
SAMZA-753 - BrokerProxy stop should stop the Kafka consumer first
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/74aa516f
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/74aa516f
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/74aa516f
Branch: refs/heads/samza-sql
Commit: 74aa516fd84f510cd9e92e9bff90e480845763d2
Parents: bc7a07a
Author: Yan Fang <ya...@gmail.com>
Authored: Tue Nov 17 16:58:07 2015 -0800
Committer: Navina <na...@gmail.com>
Committed: Tue Nov 17 17:06:01 2015 -0800
----------------------------------------------------------------------
.../org/apache/samza/system/kafka/BrokerProxy.scala | 5 +++++
.../apache/samza/system/kafka/TestBrokerProxy.scala | 13 +++++++++++++
2 files changed, 18 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index c8cbc38..9aa9818 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -303,6 +303,11 @@ class BrokerProxy(
def stop {
info("Shutting down " + toString)
+ if (simpleConsumer != null) {
+ info("closing simple consumer...")
+ simpleConsumer.close
+ }
+
thread.interrupt
thread.join
}
http://git-wip-us.apache.org/repos/asf/samza/blob/74aa516f/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index 170318e..cc7077c 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -417,4 +417,17 @@ class TestBrokerProxy extends Logging {
}
assertEquals(true, caughtError)
}
+
+ @Test
+ def brokerProxyStopCloseConsumer: Unit = {
+ val mockSimpleConsumer = mock(classOf[DefaultFetchSimpleConsumer])
+ val bp = new BrokerProxy("host", 0, "system", "clientID", new KafkaSystemConsumerMetrics(), null){
+ override def createSimpleConsumer() = {
+ mockSimpleConsumer
+ }
+ }
+ bp.start
+ bp.stop
+ verify(mockSimpleConsumer).close
+ }
}