You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/15 07:02:36 UTC

kafka git commit: KAFKA-3549: Close consumers instantiated in consumer tests

Repository: kafka
Updated Branches:
  refs/heads/trunk 4fa456bc6 -> 065ddf901


KAFKA-3549: Close consumers instantiated in consumer tests

Author: Grant Henke <gr...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1217 from granthenke/close-consumers


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/065ddf90
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/065ddf90
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/065ddf90

Branch: refs/heads/trunk
Commit: 065ddf90195e09689512b55d0718a5ebdb3d42ad
Parents: 4fa456b
Author: Grant Henke <gr...@gmail.com>
Authored: Thu Apr 14 22:02:19 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 14 22:02:19 2016 -0700

----------------------------------------------------------------------
 .../clients/consumer/KafkaConsumerTest.java     | 13 ++++++---
 .../kafka/api/BaseConsumerTest.scala            | 26 +++++++++---------
 .../kafka/api/PlaintextConsumerTest.scala       | 28 +++++++++++---------
 .../kafka/streams/perf/SimpleBenchmark.java     |  1 +
 .../streams/smoketest/SmokeTestDriver.java      |  2 +-
 5 files changed, 40 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ff07461..2272795 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -77,6 +77,8 @@ public class KafkaConsumerTest {
         consumer.unsubscribe();
         Assert.assertTrue(consumer.subscription().isEmpty());
         Assert.assertTrue(consumer.assignment().isEmpty());
+
+        consumer.close();
     }
 
     @Test(expected = IllegalArgumentException.class)
@@ -85,10 +87,13 @@ public class KafkaConsumerTest {
         props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
         props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
         props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
-        consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
-        consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+        try {
+            consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
+            consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+        } finally {
+            consumer.close();
+        }
     }
 
     @Test
@@ -129,6 +134,8 @@ public class KafkaConsumerTest {
 
         consumer.unsubscribe();
         Assert.assertTrue(consumer.paused().isEmpty());
+
+        consumer.close();
     }
 
     private KafkaConsumer<byte[], byte[]> newConsumer() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 1408cd9..916a0ab 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -13,7 +13,6 @@
 package kafka.api
 
 import java.util
-import kafka.coordinator.GroupCoordinator
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.record.TimestampType
@@ -92,6 +91,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
     val numRecords = 10000
     sendRecords(numRecords)
@@ -184,6 +184,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
     consumer0.subscribe(List(topic).asJava, listener)
 
     // the initial subscription should cause a callback execution
@@ -209,8 +211,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
 
     // only expect one revocation since revoke is not invoked on initial membership
     assertEquals(2, listener.callsToRevoked)
-
-    consumer0.close()
   }
 
   @Test
@@ -219,20 +219,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
-    try {
-      val listener = new TestConsumerReassignmentListener()
-      consumer0.subscribe(List(topic).asJava, listener)
+    val listener = new TestConsumerReassignmentListener()
+    consumer0.subscribe(List(topic).asJava, listener)
 
-      // the initial subscription should cause a callback execution
-      while (listener.callsToAssigned == 0)
-        consumer0.poll(50)
+    // the initial subscription should cause a callback execution
+    while (listener.callsToAssigned == 0)
+      consumer0.poll(50)
 
-      consumer0.subscribe(List[String]().asJava)
-      assertEquals(0, consumer0.assignment.size())
-    } finally {
-      consumer0.close()
-    }
+    consumer0.subscribe(List[String]().asJava)
+    assertEquals(0, consumer0.assignment.size())
   }
 
   @Test
@@ -240,6 +237,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
     this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
     this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
     sendRecords(5)
     consumer0.subscribe(List(topic).asJava)

http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index ff2e63d..349f7ad 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -48,6 +48,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
+
     consumer0.assign(List(tp).asJava)
 
     consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
@@ -405,6 +407,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testFetchInvalidOffset() {
     this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
     // produce one record
     val totalRecords = 2
@@ -426,8 +429,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertNotNull(outOfRangePartitions)
     assertEquals(1, outOfRangePartitions.size)
     assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
-
-    consumer0.close()
   }
 
   @Test
@@ -435,6 +436,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val maxFetchBytes = 10 * 1024
     this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString)
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
     // produce a record that is larger than the configured fetch size
     val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
@@ -450,8 +452,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(1, oversizedPartitions.size)
     // the oversized message is at offset 0
     assertEquals(0L, oversizedPartitions.get(tp))
-
-    consumer0.close()
   }
 
   @Test
@@ -460,6 +460,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
     this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
     val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += consumer0
 
     // create two new topics, each having 2 partitions
     val topic1 = "topic1"
@@ -512,13 +513,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
 
     // add one more consumer and validate re-assignment
-    addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1, topic2), subscriptions)
+    addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic1, topic2), subscriptions)
 
     // done with pollers and consumers
     for (poller <- consumerPollers)
       poller.shutdown()
 
-    for (consumer <- rrConsumers)
+    for (consumer <- consumers)
       consumer.unsubscribe()
   }
 
@@ -688,6 +689,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
     producerProps.put("mock.interceptor.append", appendStr)
     val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
+    producers += testProducer
 
     // producing records should succeed
     testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
@@ -695,6 +697,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     // create consumer with interceptor that has different key and value types from the consumer
     this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
     val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+    consumers += testConsumer
+
     testConsumer.assign(List(tp).asJava)
     testConsumer.seek(tp, 0)
 
@@ -702,9 +706,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val records = consumeRecords(testConsumer, 1)
     val record = records.get(0)
     assertEquals(s"value will not be modified", new String(record.value()))
-
-    testConsumer.close()
-    testProducer.close()
   }
 
   def testConsumeMessagesWithCreateTime() {
@@ -762,12 +763,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
 
     // create one more consumer and add it to the group; we will timeout this consumer
     val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
-    val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer)
+    // Close the consumer on test teardown, unless this test will manually
+    if(!closeConsumer)
+      consumers += timeoutConsumer
     val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
-    val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller)
+    consumerPollers += timeoutPoller
 
     // validate the initial assignment
-    validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+    validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
 
     // stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
     timeoutPoller.shutdown()
@@ -859,6 +862,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
     for (i <- 0 until consumerCount)
       consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+    consumers ++= consumerGroup
 
     // create consumer pollers, wait for assignment and validate it
     val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe, subscriptions)

http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index c883090..a92fb1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -253,6 +253,7 @@ public class SimpleBenchmark {
 
         long endTime = System.currentTimeMillis();
 
+        consumer.close();
         System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index d7b0139..205ba4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -278,7 +278,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
                 }
             }
         }
-
+        consumer.close();
 
         System.out.println("-------------------");
         System.out.println("Result Verification");