You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/02 09:26:36 UTC
kafka git commit: KAFKA-5282;
Use a factory method to create producers/consumers and close them in
tearDown
Repository: kafka
Updated Branches:
refs/heads/trunk 47a3066cc -> 5d4634861
KAFKA-5282; Use a factory method to create producers/consumers and close them in tearDown
Author: Vahid Hashemian <va...@us.ibm.com>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
This patch had conflicts when merged, resolved by
Committer: Ismael Juma <is...@juma.me.uk>
Closes #3129 from vahidhashemian/KAFKA-5282
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d463486
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d463486
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d463486
Branch: refs/heads/trunk
Commit: 5d46348619b1ca8610af305bf5e90c560b050258
Parents: 47a3066
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Jun 2 10:24:42 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 2 10:24:42 2017 +0100
----------------------------------------------------------------------
.../kafka/api/TransactionsTest.scala | 374 +++++++++----------
1 file changed, 180 insertions(+), 194 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d463486/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0a082ed..9aceec8 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -24,23 +24,31 @@ import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.ProducerFencedException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.{After, Before, Test}
import org.junit.Assert._
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Buffer}
import scala.concurrent.ExecutionException
class TransactionsTest extends KafkaServerTestHarness {
val numServers = 3
+ val transactionalProducerCount = 2
+ val transactionalConsumerCount = 1
+ val nonTransactionalConsumerCount = 1
+
val topic1 = "topic1"
val topic2 = "topic2"
+ val transactionalProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+ val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+ val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
- override def generateConfigs : Seq[KafkaConfig] = {
+ override def generateConfigs: Seq[KafkaConfig] = {
TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
}
@@ -52,49 +60,54 @@ class TransactionsTest extends KafkaServerTestHarness {
topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
+
+ for (_ <- 0 until transactionalProducerCount)
+ transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer", servers)
+ for (_ <- 0 until transactionalConsumerCount)
+ transactionalConsumers += transactionalConsumer("transactional-group")
+ for (_ <- 0 until nonTransactionalConsumerCount)
+ nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group")
}
@After
override def tearDown(): Unit = {
+ transactionalProducers.foreach(_.close())
+ transactionalConsumers.foreach(_.close())
+ nonTransactionalConsumers.foreach(_.close())
super.tearDown()
}
@Test
def testBasicTransactions() = {
- val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id", servers)
- val consumer = transactionalConsumer("transactional-group")
- val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group")
- try {
- producer.initTransactions()
+ val producer = transactionalProducers(0)
+ val consumer = transactionalConsumers(0)
+ val unCommittedConsumer = nonTransactionalConsumers(0)
- producer.beginTransaction()
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
- producer.flush()
- producer.abortTransaction()
+ producer.initTransactions()
- producer.beginTransaction()
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
- producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
- producer.commitTransaction()
+ producer.beginTransaction()
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+ producer.flush()
+ producer.abortTransaction()
- consumer.subscribe(List(topic1, topic2))
- unCommittedConsumer.subscribe(List(topic1, topic2))
+ producer.beginTransaction()
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
+ producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
+ producer.commitTransaction()
- val records = pollUntilExactlyNumRecords(consumer, 2)
- records.zipWithIndex.foreach { case (record, i) =>
- TestUtils.assertCommittedAndGetValue(record)
- }
+ consumer.subscribe(List(topic1, topic2).asJava)
+ unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
- val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
- val expectedValues = List("1", "2", "3", "4").toSet
- allRecords.zipWithIndex.foreach { case (record, i) =>
- assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
- }
- } finally {
- consumer.close()
- producer.close()
- unCommittedConsumer.close()
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
+ }
+
+ val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+ val expectedValues = List("1", "2", "3", "4").toSet
+ allRecords.foreach { record =>
+ assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
}
}
@@ -109,16 +122,15 @@ class TransactionsTest extends KafkaServerTestHarness {
// transactions, we should not have any duplicates or missing messages since we should process in the input
// messages exactly once.
- val transactionalId = "foobar-id"
val consumerGroupId = "foobar-consumer-group"
val numSeedMessages = 500
TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers)
- val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+ val producer = transactionalProducers(0)
val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
- consumer.subscribe(List(topic1))
+ consumer.subscribe(List(topic1).asJava)
producer.initTransactions()
var shouldCommit = false
@@ -130,13 +142,13 @@ class TransactionsTest extends KafkaServerTestHarness {
producer.beginTransaction()
shouldCommit = !shouldCommit
- records.zipWithIndex.foreach { case (record, i) =>
+ records.foreach { record =>
val key = new String(record.key(), "UTF-8")
val value = new String(record.value(), "UTF-8")
producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit))
}
- producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId)
+ producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroupId)
if (shouldCommit) {
producer.commitTransaction()
recordsProcessed += records.size
@@ -150,18 +162,16 @@ class TransactionsTest extends KafkaServerTestHarness {
}
}
} finally {
- producer.close()
consumer.close()
}
- // In spite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
+ // In spite of random aborts, we should still have exactly 1000 messages in topic2. I.e. we should not
// re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
- val verifyingConsumer = transactionalConsumer("foobargroup")
- verifyingConsumer.subscribe(List(topic2))
+ val verifyingConsumer = transactionalConsumers(0)
+ verifyingConsumer.subscribe(List(topic2).asJava)
val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record =>
TestUtils.assertCommittedAndGetValue(record).toInt
}
- verifyingConsumer.close()
val valueSet = valueSeq.toSet
assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size)
assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size, valueSet.size)
@@ -169,190 +179,166 @@ class TransactionsTest extends KafkaServerTestHarness {
@Test
def testFencingOnCommit() = {
- val transactionalId = "my-t.id"
- val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val consumer = transactionalConsumer()
- consumer.subscribe(List(topic1, topic2))
+ val producer1 = transactionalProducers(0)
+ val producer2 = transactionalProducers(1)
+ val consumer = transactionalConsumers(0)
- try {
- producer1.initTransactions()
+ consumer.subscribe(List(topic1, topic2).asJava)
- producer1.beginTransaction()
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
- producer2.initTransactions() // ok, will abort the open transaction.
- producer2.beginTransaction()
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
-
- try {
- producer1.commitTransaction()
- fail("Should not be able to commit transactions from a fenced producer.")
- } catch {
- case e : ProducerFencedException =>
- // good!
- case e : Exception =>
- fail("Got an unexpected exception from a fenced producer.", e)
- }
+ producer1.initTransactions()
- producer2.commitTransaction() // ok
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
- val records = pollUntilExactlyNumRecords(consumer, 2)
- records.zipWithIndex.foreach { case (record, i) =>
- TestUtils.assertCommittedAndGetValue(record)
- }
- } finally {
- consumer.close()
- producer1.close()
- producer2.close()
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+ try {
+ producer1.commitTransaction()
+ fail("Should not be able to commit transactions from a fenced producer.")
+ } catch {
+ case _: ProducerFencedException =>
+ // good!
+ case e: Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
}
}
@Test
def testFencingOnSendOffsets() = {
- val transactionalId = "my-t.id"
- val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val consumer = transactionalConsumer()
- consumer.subscribe(List(topic1, topic2))
+ val producer1 = transactionalProducers(0)
+ val producer2 = transactionalProducers(1)
+ val consumer = transactionalConsumers(0)
- try {
- producer1.initTransactions()
+ consumer.subscribe(List(topic1, topic2).asJava)
- producer1.beginTransaction()
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
- producer2.initTransactions() // ok, will abort the open transaction.
- producer2.beginTransaction()
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
-
- try {
- producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)), "foobarGroup")
- fail("Should not be able to send offsets from a fenced producer.")
- } catch {
- case e : ProducerFencedException =>
- // good!
- case e : Exception =>
- fail("Got an unexpected exception from a fenced producer.", e)
- }
+ producer1.initTransactions()
- producer2.commitTransaction() // ok
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
- val records = pollUntilExactlyNumRecords(consumer, 2)
- records.zipWithIndex.foreach { case (record, i) =>
- TestUtils.assertCommittedAndGetValue(record)
- }
- } finally {
- consumer.close()
- producer1.close()
- producer2.close()
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+ try {
+ producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)).asJava,
+ "foobarGroup")
+ fail("Should not be able to send offsets from a fenced producer.")
+ } catch {
+ case _: ProducerFencedException =>
+ // good!
+ case e: Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
}
}
@Test
def testFencingOnSend() {
- val transactionalId = "my-t.id"
- val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val consumer = transactionalConsumer()
- consumer.subscribe(List(topic1, topic2))
+ val producer1 = transactionalProducers(0)
+ val producer2 = transactionalProducers(1)
+ val consumer = transactionalConsumers(0)
- try {
- producer1.initTransactions()
+ consumer.subscribe(List(topic1, topic2).asJava)
- producer1.beginTransaction()
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
- producer2.initTransactions() // ok, will abort the open transaction.
- producer2.beginTransaction()
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
-
- try {
- val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
- val recordMetadata = result.get()
- error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
- servers.foreach { case (server) =>
- error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
- }
- fail("Should not be able to send messages from a fenced producer.")
- } catch {
- case e : ProducerFencedException =>
- producer1.close()
- case e : ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
- case e : Exception =>
- fail("Got an unexpected exception from a fenced producer.", e)
- }
+ producer1.initTransactions()
- producer2.commitTransaction() // ok
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
- val records = pollUntilExactlyNumRecords(consumer, 2)
- records.zipWithIndex.foreach { case (record, i) =>
- TestUtils.assertCommittedAndGetValue(record)
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+ try {
+ val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+ val recordMetadata = result.get()
+ error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
+ servers.foreach { server =>
+ error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
}
- } finally {
- consumer.close()
- producer1.close()
- producer2.close()
+ fail("Should not be able to send messages from a fenced producer.")
+ } catch {
+ case _: ProducerFencedException =>
+ producer1.close()
+ case e: ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+ case e: Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
+
+ producer2.commitTransaction() // ok
+
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
}
}
@Test
def testFencingOnAddPartitions(): Unit = {
- val transactionalId = "my-t.id"
- val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
- val consumer = transactionalConsumer()
- consumer.subscribe(List(topic1, topic2))
+ val producer1 = transactionalProducers(0)
+ val producer2 = transactionalProducers(1)
+ val consumer = transactionalConsumers(0)
- try {
- producer1.initTransactions()
+ consumer.subscribe(List(topic1, topic2).asJava)
+
+ producer1.initTransactions()
+ producer1.beginTransaction()
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+ producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+ producer1.abortTransaction()
+
+ producer2.initTransactions() // ok, will abort the open transaction.
+ producer2.beginTransaction()
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+ .get(20, TimeUnit.SECONDS)
+ producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+ .get(20, TimeUnit.SECONDS)
+ try {
producer1.beginTransaction()
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
- producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
- producer1.abortTransaction()
-
- producer2.initTransactions() // ok, will abort the open transaction.
- producer2.beginTransaction()
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
- .get(20, TimeUnit.SECONDS)
- producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
- .get(20, TimeUnit.SECONDS)
-
- try {
- producer1.beginTransaction()
- val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
- val recordMetadata = result.get()
- error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
- servers.foreach { case (server) =>
- error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
- }
- fail("Should not be able to send messages from a fenced producer.")
- } catch {
- case e : ProducerFencedException =>
- case e : ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
- case e : Exception =>
- fail("Got an unexpected exception from a fenced producer.", e)
+ val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+ val recordMetadata = result.get()
+ error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
+ servers.foreach { case (server) =>
+ error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
}
+ fail("Should not be able to send messages from a fenced producer.")
+ } catch {
+ case _: ProducerFencedException =>
+ case e: ExecutionException =>
+ assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+ case e: Exception =>
+ fail("Got an unexpected exception from a fenced producer.", e)
+ }
- producer2.commitTransaction() // ok
+ producer2.commitTransaction() // ok
- val records = pollUntilExactlyNumRecords(consumer, 2)
- records.zipWithIndex.foreach { case (record, i) =>
- TestUtils.assertCommittedAndGetValue(record)
- }
- } finally {
- consumer.close()
- producer1.close()
- producer2.close()
+ val records = pollUntilExactlyNumRecords(consumer, 2)
+ records.foreach { record =>
+ TestUtils.assertCommittedAndGetValue(record)
}
}
@@ -388,10 +374,10 @@ class TransactionsTest extends KafkaServerTestHarness {
groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
}
- private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+ private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
TestUtils.waitUntilTrue(() => {
- records ++= consumer.poll(50)
+ records ++= consumer.poll(50).asScala
records.size == numRecords
}, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
records