You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sr...@apache.org on 2016/06/06 02:33:05 UTC

kafka git commit: MINOR: Fix producer leak in `PlaintextProducerSendTest`

Repository: kafka
Updated Branches:
  refs/heads/trunk f4a263b5a -> b60af34d4


MINOR: Fix producer leak in `PlaintextProducerSendTest`

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Sriharsha Chintalapani <ha...@hortonworks.com>, Guozhang Wang <wa...@gmail.com>

Closes #1471 from ijuma/fix-leaking-producers-in-plaintext-producer-send-test


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b60af34d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b60af34d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b60af34d

Branch: refs/heads/trunk
Commit: b60af34d4a200dbc5062ba40bfb7ffc7162e72d0
Parents: f4a263b
Author: Ismael Juma <is...@juma.me.uk>
Authored: Sun Jun 5 19:32:51 2016 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Jun 5 19:32:51 2016 -0700

----------------------------------------------------------------------
 .../integration/kafka/api/BaseProducerSendTest.scala      |  4 ++++
 .../integration/kafka/api/PlaintextProducerSendTest.scala | 10 +++++-----
 2 files changed, 9 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 9489e70..0a2b49a 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -73,6 +73,10 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
   private def createProducer(brokerList: String, retries: Int = 0, lingerMs: Long = 0, props: Option[Properties] = None): KafkaProducer[Array[Byte],Array[Byte]] = {
     val producer = TestUtils.createNewProducer(brokerList, securityProtocol = securityProtocol, trustStoreFile = trustStoreFile,
       saslProperties = saslProperties, retries = retries, lingerMs = lingerMs, props = props)
+    registerProducer(producer)
+  }
+
+  protected def registerProducer(producer: KafkaProducer[Array[Byte], Array[Byte]]): KafkaProducer[Array[Byte], Array[Byte]] = {
     producers += producer
     producer
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b60af34d/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
index 111bc15..55fdbe3 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala
@@ -40,16 +40,16 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     createNewProducerWithExplicitSerializer(brokerList)
   }
 
-  private def createNewProducerWithNoSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+  private def createNewProducerWithNoSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    registerProducer(new KafkaProducer(producerProps))
   }
 
-  private def createNewProducerWithExplicitSerializer(brokerList: String) : KafkaProducer[Array[Byte],Array[Byte]] = {
+  private def createNewProducerWithExplicitSerializer(brokerList: String): KafkaProducer[Array[Byte], Array[Byte]] = {
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps, new ByteArraySerializer, new ByteArraySerializer)
+    registerProducer(new KafkaProducer(producerProps, new ByteArraySerializer, new ByteArraySerializer))
   }
 
   @Test
@@ -70,7 +70,7 @@ class PlaintextProducerSendTest extends BaseProducerSendTest {
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
     producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
-    return new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
+    registerProducer(new KafkaProducer(producerProps))
   }
 
 }