You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2016/04/15 07:02:36 UTC
kafka git commit: KAFKA-3549: Close consumers instantiated in
consumer tests
Repository: kafka
Updated Branches:
refs/heads/trunk 4fa456bc6 -> 065ddf901
KAFKA-3549: Close consumers instantiated in consumer tests
Author: Grant Henke <gr...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Ewen Cheslack-Postava <ew...@confluent.io>
Closes #1217 from granthenke/close-consumers
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/065ddf90
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/065ddf90
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/065ddf90
Branch: refs/heads/trunk
Commit: 065ddf90195e09689512b55d0718a5ebdb3d42ad
Parents: 4fa456b
Author: Grant Henke <gr...@gmail.com>
Authored: Thu Apr 14 22:02:19 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Thu Apr 14 22:02:19 2016 -0700
----------------------------------------------------------------------
.../clients/consumer/KafkaConsumerTest.java | 13 ++++++---
.../kafka/api/BaseConsumerTest.scala | 26 +++++++++---------
.../kafka/api/PlaintextConsumerTest.scala | 28 +++++++++++---------
.../kafka/streams/perf/SimpleBenchmark.java | 1 +
.../streams/smoketest/SmokeTestDriver.java | 2 +-
5 files changed, 40 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index ff07461..2272795 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -77,6 +77,8 @@ public class KafkaConsumerTest {
consumer.unsubscribe();
Assert.assertTrue(consumer.subscription().isEmpty());
Assert.assertTrue(consumer.assignment().isEmpty());
+
+ consumer.close();
}
@Test(expected = IllegalArgumentException.class)
@@ -85,10 +87,13 @@ public class KafkaConsumerTest {
props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, "testSeekNegative");
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
props.setProperty(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
-
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
- consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
- consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+ try {
+ consumer.assign(Arrays.asList(new TopicPartition("nonExistTopic", 0)));
+ consumer.seek(new TopicPartition("nonExistTopic", 0), -1);
+ } finally {
+ consumer.close();
+ }
}
@Test
@@ -129,6 +134,8 @@ public class KafkaConsumerTest {
consumer.unsubscribe();
Assert.assertTrue(consumer.paused().isEmpty());
+
+ consumer.close();
}
private KafkaConsumer<byte[], byte[]> newConsumer() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 1408cd9..916a0ab 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -13,7 +13,6 @@
package kafka.api
import java.util
-import kafka.coordinator.GroupCoordinator
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.record.TimestampType
@@ -92,6 +91,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
val numRecords = 10000
sendRecords(numRecords)
@@ -184,6 +184,8 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
+
consumer0.subscribe(List(topic).asJava, listener)
// the initial subscription should cause a callback execution
@@ -209,8 +211,6 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
// only expect one revocation since revoke is not invoked on initial membership
assertEquals(2, listener.callsToRevoked)
-
- consumer0.close()
}
@Test
@@ -219,20 +219,17 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
- try {
- val listener = new TestConsumerReassignmentListener()
- consumer0.subscribe(List(topic).asJava, listener)
+ val listener = new TestConsumerReassignmentListener()
+ consumer0.subscribe(List(topic).asJava, listener)
- // the initial subscription should cause a callback execution
- while (listener.callsToAssigned == 0)
- consumer0.poll(50)
+ // the initial subscription should cause a callback execution
+ while (listener.callsToAssigned == 0)
+ consumer0.poll(50)
- consumer0.subscribe(List[String]().asJava)
- assertEquals(0, consumer0.assignment.size())
- } finally {
- consumer0.close()
- }
+ consumer0.subscribe(List[String]().asJava)
+ assertEquals(0, consumer0.assignment.size())
}
@Test
@@ -240,6 +237,7 @@ abstract class BaseConsumerTest extends IntegrationTestHarness with Logging {
this.consumerConfig.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "100") // timeout quickly to avoid slow test
this.consumerConfig.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "30")
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
sendRecords(5)
consumer0.subscribe(List(topic).asJava)
http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index ff2e63d..349f7ad 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -48,6 +48,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumerConfig.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
+
consumer0.assign(List(tp).asJava)
consumeAndVerifyRecords(consumer0, numRecords = numRecords, startingOffset = 0,
@@ -405,6 +407,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
def testFetchInvalidOffset() {
this.consumerConfig.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
// produce one record
val totalRecords = 2
@@ -426,8 +429,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertNotNull(outOfRangePartitions)
assertEquals(1, outOfRangePartitions.size)
assertEquals(outOfRangePos.toLong, outOfRangePartitions.get(tp))
-
- consumer0.close()
}
@Test
@@ -435,6 +436,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val maxFetchBytes = 10 * 1024
this.consumerConfig.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxFetchBytes.toString)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
// produce a record that is larger than the configured fetch size
val record = new ProducerRecord[Array[Byte], Array[Byte]](tp.topic(), tp.partition(), "key".getBytes, new Array[Byte](maxFetchBytes + 1))
@@ -450,8 +452,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
assertEquals(1, oversizedPartitions.size)
// the oversized message is at offset 0
assertEquals(0L, oversizedPartitions.get(tp))
-
- consumer0.close()
}
@Test
@@ -460,6 +460,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
this.consumerConfig.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "roundrobin-group")
this.consumerConfig.setProperty(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, classOf[RoundRobinAssignor].getName)
val consumer0 = new KafkaConsumer(this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += consumer0
// create two new topics, each having 2 partitions
val topic1 = "topic1"
@@ -512,13 +513,13 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val (rrConsumers, consumerPollers) = createConsumerGroupAndWaitForAssignment(10, List(topic1, topic2), subscriptions)
// add one more consumer and validate re-assignment
- addConsumersToGroupAndWaitForGroupAssignment(1, rrConsumers, consumerPollers, List(topic1, topic2), subscriptions)
+ addConsumersToGroupAndWaitForGroupAssignment(1, consumers, consumerPollers, List(topic1, topic2), subscriptions)
// done with pollers and consumers
for (poller <- consumerPollers)
poller.shutdown()
- for (consumer <- rrConsumers)
+ for (consumer <- consumers)
consumer.unsubscribe()
}
@@ -688,6 +689,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
producerProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockProducerInterceptor")
producerProps.put("mock.interceptor.append", appendStr)
val testProducer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps, new ByteArraySerializer(), new ByteArraySerializer())
+ producers += testProducer
// producing records should succeed
testProducer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key".getBytes, s"value will not be modified".getBytes))
@@ -695,6 +697,8 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create consumer with interceptor that has different key and value types from the consumer
this.consumerConfig.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, "org.apache.kafka.test.MockConsumerInterceptor")
val testConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig, new ByteArrayDeserializer(), new ByteArrayDeserializer())
+ consumers += testConsumer
+
testConsumer.assign(List(tp).asJava)
testConsumer.seek(tp, 0)
@@ -702,9 +706,6 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val records = consumeRecords(testConsumer, 1)
val record = records.get(0)
assertEquals(s"value will not be modified", new String(record.value()))
-
- testConsumer.close()
- testProducer.close()
}
def testConsumeMessagesWithCreateTime() {
@@ -762,12 +763,14 @@ class PlaintextConsumerTest extends BaseConsumerTest {
// create one more consumer and add it to the group; we will timeout this consumer
val timeoutConsumer = new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
- val expandedConsumers = consumers ++ Buffer[KafkaConsumer[Array[Byte], Array[Byte]]](timeoutConsumer)
+ // Close the consumer on test teardown, unless this test will manually
+ if(!closeConsumer)
+ consumers += timeoutConsumer
val timeoutPoller = subscribeConsumerAndStartPolling(timeoutConsumer, List(topic, topic1))
- val expandedPollers = consumerPollers ++ Buffer[ConsumerAssignmentPoller](timeoutPoller)
+ consumerPollers += timeoutPoller
// validate the initial assignment
- validateGroupAssignment(expandedPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
+ validateGroupAssignment(consumerPollers, subscriptions, s"Did not get valid initial assignment for partitions ${subscriptions.asJava}")
// stop polling and close one of the consumers, should trigger partition re-assignment among alive consumers
timeoutPoller.shutdown()
@@ -859,6 +862,7 @@ class PlaintextConsumerTest extends BaseConsumerTest {
val consumerGroup = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
for (i <- 0 until consumerCount)
consumerGroup += new KafkaConsumer[Array[Byte], Array[Byte]](this.consumerConfig)
+ consumers ++= consumerGroup
// create consumer pollers, wait for assignment and validate it
val consumerPollers = subscribeConsumersAndWaitForAssignment(consumerGroup, topicsToSubscribe, subscriptions)
http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index c883090..a92fb1b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -253,6 +253,7 @@ public class SimpleBenchmark {
long endTime = System.currentTimeMillis();
+ consumer.close();
System.out.println("Consumer Performance [MB/sec read]: " + megaBytePerSec(endTime - startTime));
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/065ddf90/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index d7b0139..205ba4b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -278,7 +278,7 @@ public class SmokeTestDriver extends SmokeTestUtil {
}
}
}
-
+ consumer.close();
System.out.println("-------------------");
System.out.println("Result Verification");