You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/02/09 18:49:43 UTC
kafka git commit: KAFKA-3217: Close producers in unit tests
Repository: kafka
Updated Branches:
refs/heads/trunk 9cac38c02 -> 9f5a1f876
KAFKA-3217: Close producers in unit tests
Producers that are not closed auto-create topics in subsequent tests when Kafka server port is reused. Added missing close().
Author: Rajini Sivaram <ra...@googlemail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #882 from rajinisivaram/KAFKA-3217
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f5a1f87
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f5a1f87
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f5a1f87
Branch: refs/heads/trunk
Commit: 9f5a1f87667c23db557a712d51c45541372f3c5d
Parents: 9cac38c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Tue Feb 9 09:49:32 2016 -0800
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Tue Feb 9 09:49:32 2016 -0800
----------------------------------------------------------------------
.../kafka/api/AuthorizerIntegrationTest.scala | 2 ++
.../integration/kafka/api/BaseProducerSendTest.scala | 11 +++++++++--
2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a54cbef..db2040f 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -154,6 +154,8 @@ class AuthorizerIntegrationTest extends KafkaServerTestHarness {
@After
override def tearDown() = {
+ producers.foreach(_.close())
+ consumers.foreach(_.close())
removeAllAcls
super.tearDown()
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/9f5a1f87/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 29291d4..42928a3 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.errors.SerializationException
import org.junit.Assert._
import org.junit.{After, Before, Test}
+import scala.collection.mutable.Buffer
abstract class BaseProducerSendTest extends KafkaServerTestHarness {
@@ -43,6 +44,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
private var consumer1: SimpleConsumer = null
private var consumer2: SimpleConsumer = null
+ private val producers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
private val topic = "topic"
private val numRecords = 100
@@ -60,13 +62,18 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
override def tearDown() {
consumer1.close()
consumer2.close()
+ // Ensure that all producers are closed since unclosed producers impact other tests when Kafka server ports are reused
+ producers.foreach(_.close())
super.tearDown()
}
- private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] =
- TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
+ private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
+ val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
retries = retries, lingerMs = lingerMs, props = props)
+ producers += producer
+ producer
+ }
/**
* testSendOffset checks the basic send API behavior