You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/06/06 02:33:05 UTC
kafka git commit: MINOR: Fix producer leak in
`PlaintextProducerSendTest`
Repository: kafka
Updated Branches:
refs/heads/trunk f4a263b5a -> b60af34d4
MINOR: Fix producer leak in `PlaintextProducerSendTest`
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Guozhang Wang <wa...@gmail.com>
Closes #1471 from ijuma/fix-leaking-producers-in-plaintext-producer-send-test
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b60af34d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b60af34d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b60af34d
Branch: refs/heads/trunk
Commit: b60af34d4a200dbc5062ba40bfb7ffc7162e72d0
Parents: f4a263b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Jun 5 19:32:51 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Jun 5 19:32:51 2016 -0700
----------------------------------------------------------------------
.../integration/kafka/api/BaseProducerSendTest.scala | 4 ++++
.../integration/kafka/api/PlaintextProducerSendTest.scala | 10 +++++-----
2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9489e70..0a2b49a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -73,6 +73,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props)
+ registerProducer(producer)
+ }
+
+ protected def registerProducer(producer: KafkaProducer[Array[Byte], Array[Byte]]): KafkaProducer[Array[Byte], Array[Byte]] = {
producers += producer
producer
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 111bc15..55fdbe3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -40,16 +40,16 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
createNewProducerWithExplicitSerializer(brokerList)
}
- private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+ private def createNewProducerWithNoSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+ registerProducer(new KafkaProducer(producerProps))
}
- private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+ private def createNewProducerWithExplicitSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
val producerProps = new Properties()
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
- return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
+ registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
}
@Test
@@ -70,7 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
- return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+ registerProducer(new KafkaProducer(producerProps))
}
}