You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 18:49:43 UTC

kafka git commit: KAFKA-3217: Close producers in unit tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 9cac38c02 -> 9f5a1f876


KAFKA-3217: Close producers in unit tests

Producers that are not closed auto-create topics in subsequent tests when Kafka server port is reused. Added missing close().

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #882 from rajinisivaram/KAFKA-3217


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f5a1f87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f5a1f87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f5a1f87

Branch: refs/heads/trunk
Commit: 9f5a1f87667c23db557a712d51c45541372f3c5d
Parents: 9cac38c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Feb 9 09:49:32 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 9 09:49:32 2016 -0800

----------------------------------------------------------------------
 .../kafka/api/AuthorizerIntegrationTest.scala            |  2 ++
 .../integration/kafka/api/BaseProducerSendTest.scala     | 11 +++++++++--
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a54cbef..db2040f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -154,6 +154,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
 
   @After
   override def tearDown() = {
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
     removeAllAcls
     super.tearDown()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 29291d4..42928a3 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors.SerializationException
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+import scala.collection.mutable.Buffer
 
 abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
@@ -43,6 +44,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
 
   private var consumer1: SimpleConsumer = null
   private var consumer2: SimpleConsumer = null
+  private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
 
   private val topic = "topic"
   private val numRecords = 100
@@ -60,13 +62,18 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   override def tearDown() {
     consumer1.close()
     consumer2.close()
+    // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
+    producers.foreach(_.close())
 
     super.tearDown()
   }
 
-  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] =
-    TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
+  private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
       retries = retries, lingerMs = lingerMs, props = props)
+    producers += producer
+    producer
+  }
 
   /**
    * testSendOffset checks the basic send API behavior