You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/02 09:26:36 UTC

kafka git commit: KAFKA-5282; Use a factory method to create producers/consumers and close them in tearDown

Repository: kafka
Updated Branches:
  refs/heads/trunk 47a3066cc -> 5d4634861


KAFKA-5282; Use a factory method to create producers/consumers and close them in tearDown

Author: Vahid Hashemian <va...@us.ibm.com>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

This patch had conflicts when merged, resolved by
Committer: Ismael Juma <is...@juma.me.uk>

Closes #3129 from vahidhashemian/KAFKA-5282


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5d463486
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5d463486
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5d463486

Branch: refs/heads/trunk
Commit: 5d46348619b1ca8610af305bf5e90c560b050258
Parents: 47a3066
Author: Vahid Hashemian <va...@us.ibm.com>
Authored: Fri Jun 2 10:24:42 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Jun 2 10:24:42 2017 +0100

----------------------------------------------------------------------
 .../kafka/api/TransactionsTest.scala            | 374 +++++++++----------
 1 file changed, 180 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5d463486/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index 0a082ed..9aceec8 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -24,23 +24,31 @@ import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.ProducerFencedException
 import org.apache.kafka.common.protocol.SecurityProtocol
 import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{ArrayBuffer, Buffer}
 import scala.concurrent.ExecutionException
 
 class TransactionsTest extends KafkaServerTestHarness {
   val numServers = 3
+  val transactionalProducerCount = 2
+  val transactionalConsumerCount = 1
+  val nonTransactionalConsumerCount = 1
+
   val topic1 = "topic1"
   val topic2 = "topic2"
 
+  val transactionalProducers = Buffer[KafkaProducer[Array[Byte], Array[Byte]]]()
+  val transactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
+  val nonTransactionalConsumers = Buffer[KafkaConsumer[Array[Byte], Array[Byte]]]()
 
-  override def generateConfigs : Seq[KafkaConfig] = {
+  override def generateConfigs: Seq[KafkaConfig] = {
     TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
   }
 
@@ -52,49 +60,54 @@ class TransactionsTest extends KafkaServerTestHarness {
     topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
     TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
     TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
+
+    for (_ <- 0 until transactionalProducerCount)
+      transactionalProducers += TestUtils.createTransactionalProducer("transactional-producer", servers)
+    for (_ <- 0 until transactionalConsumerCount)
+      transactionalConsumers += transactionalConsumer("transactional-group")
+    for (_ <- 0 until nonTransactionalConsumerCount)
+      nonTransactionalConsumers += nonTransactionalConsumer("non-transactional-group")
   }
 
   @After
   override def tearDown(): Unit = {
+    transactionalProducers.foreach(_.close())
+    transactionalConsumers.foreach(_.close())
+    nonTransactionalConsumers.foreach(_.close())
     super.tearDown()
   }
 
   @Test
   def testBasicTransactions() = {
-    val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id", servers)
-    val consumer = transactionalConsumer("transactional-group")
-    val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group")
-    try {
-      producer.initTransactions()
+    val producer = transactionalProducers(0)
+    val consumer = transactionalConsumers(0)
+    val unCommittedConsumer = nonTransactionalConsumers(0)
 
-      producer.beginTransaction()
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
-      producer.flush()
-      producer.abortTransaction()
+    producer.initTransactions()
 
-      producer.beginTransaction()
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
-      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
-      producer.commitTransaction()
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+    producer.flush()
+    producer.abortTransaction()
 
-      consumer.subscribe(List(topic1, topic2))
-      unCommittedConsumer.subscribe(List(topic1, topic2))
+    producer.beginTransaction()
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
+    producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
+    producer.commitTransaction()
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
+    consumer.subscribe(List(topic1, topic2).asJava)
+    unCommittedConsumer.subscribe(List(topic1, topic2).asJava)
 
-      val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
-      val expectedValues = List("1", "2", "3", "4").toSet
-      allRecords.zipWithIndex.foreach { case (record, i) =>
-        assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
-      }
-    } finally {
-      consumer.close()
-      producer.close()
-      unCommittedConsumer.close()
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
+    }
+
+    val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+    val expectedValues = List("1", "2", "3", "4").toSet
+    allRecords.foreach { record =>
+      assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
     }
   }
 
@@ -109,16 +122,15 @@ class TransactionsTest extends KafkaServerTestHarness {
     //     transactions, we should not have any duplicates or missing messages since we should process in the input
     //     messages exactly once.
 
-    val transactionalId = "foobar-id"
     val consumerGroupId = "foobar-consumer-group"
     val numSeedMessages = 500
 
     TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers)
 
-    val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer = transactionalProducers(0)
 
     val consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
-    consumer.subscribe(List(topic1))
+    consumer.subscribe(List(topic1).asJava)
     producer.initTransactions()
 
     var shouldCommit = false
@@ -130,13 +142,13 @@ class TransactionsTest extends KafkaServerTestHarness {
         producer.beginTransaction()
         shouldCommit = !shouldCommit
 
-        records.zipWithIndex.foreach { case (record, i) =>
+        records.foreach { record =>
           val key = new String(record.key(), "UTF-8")
           val value = new String(record.value(), "UTF-8")
           producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit))
         }
 
-        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId)
+        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer).asJava, consumerGroupId)
         if (shouldCommit) {
           producer.commitTransaction()
           recordsProcessed += records.size
@@ -150,18 +162,16 @@ class TransactionsTest extends KafkaServerTestHarness {
        }
       }
     } finally {
-      producer.close()
       consumer.close()
     }
 
-    // In spite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
+    // In spite of random aborts, we should still have exactly 1000 messages in topic2. I.e. we should not
     // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
-    val verifyingConsumer = transactionalConsumer("foobargroup")
-    verifyingConsumer.subscribe(List(topic2))
+    val verifyingConsumer = transactionalConsumers(0)
+    verifyingConsumer.subscribe(List(topic2).asJava)
     val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record =>
       TestUtils.assertCommittedAndGetValue(record).toInt
     }
-    verifyingConsumer.close()
     val valueSet = valueSeq.toSet
     assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size)
     assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size, valueSet.size)
@@ -169,190 +179,166 @@ class TransactionsTest extends KafkaServerTestHarness {
 
   @Test
   def testFencingOnCommit() = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
-
-      try {
-        producer1.commitTransaction()
-        fail("Should not be able to commit transactions from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          // good!
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+    try {
+      producer1.commitTransaction()
+      fail("Should not be able to commit transactions from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction()  // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnSendOffsets() = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
-
-      try {
-        producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)),  "foobarGroup")
-        fail("Should not be able to send offsets from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          // good!
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+    try {
+      producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)).asJava,
+        "foobarGroup")
+      fail("Should not be able to send offsets from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        // good!
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction()  // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnSend() {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
 
-      producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
-
-      try {
-        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
-        val recordMetadata = result.get()
-        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
-        servers.foreach { case (server) =>
-          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
-        }
-        fail("Should not be able to send messages from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-          producer1.close()
-        case e : ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
-      }
+    producer1.initTransactions()
 
-      producer2.commitTransaction()  // ok
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+    try {
+      val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+      val recordMetadata = result.get()
+      error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
+      servers.foreach { server =>
+        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
       }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+      fail("Should not be able to send messages from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+        producer1.close()
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
+
+    producer2.commitTransaction() // ok
+
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
   @Test
   def testFencingOnAddPartitions(): Unit = {
-    val transactionalId = "my-t.id"
-    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
-    val consumer = transactionalConsumer()
-    consumer.subscribe(List(topic1, topic2))
+    val producer1 = transactionalProducers(0)
+    val producer2 = transactionalProducers(1)
+    val consumer = transactionalConsumers(0)
 
-    try {
-      producer1.initTransactions()
+    consumer.subscribe(List(topic1, topic2).asJava)
+
+    producer1.initTransactions()
+    producer1.beginTransaction()
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+    producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+    producer1.abortTransaction()
+
+    producer2.initTransactions()  // ok, will abort the open transaction.
+    producer2.beginTransaction()
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+      .get(20, TimeUnit.SECONDS)
+    producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+      .get(20, TimeUnit.SECONDS)
 
+    try {
       producer1.beginTransaction()
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
-      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
-      producer1.abortTransaction()
-
-      producer2.initTransactions()  // ok, will abort the open transaction.
-      producer2.beginTransaction()
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
-        .get(20, TimeUnit.SECONDS)
-      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
-        .get(20, TimeUnit.SECONDS)
-
-      try {
-        producer1.beginTransaction()
-        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
-        val recordMetadata = result.get()
-        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
-        servers.foreach { case (server) =>
-          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
-        }
-        fail("Should not be able to send messages from a fenced producer.")
-      } catch {
-        case e : ProducerFencedException =>
-        case e : ExecutionException =>
-          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
-        case e : Exception =>
-          fail("Got an unexpected exception from a fenced producer.", e)
+      val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+      val recordMetadata = result.get()
+      error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
+      servers.foreach { case (server) =>
+        error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
       }
+      fail("Should not be able to send messages from a fenced producer.")
+    } catch {
+      case _: ProducerFencedException =>
+      case e: ExecutionException =>
+        assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+      case e: Exception =>
+        fail("Got an unexpected exception from a fenced producer.", e)
+    }
 
-      producer2.commitTransaction()  // ok
+    producer2.commitTransaction()  // ok
 
-      val records = pollUntilExactlyNumRecords(consumer, 2)
-      records.zipWithIndex.foreach { case (record, i) =>
-        TestUtils.assertCommittedAndGetValue(record)
-      }
-    } finally {
-      consumer.close()
-      producer1.close()
-      producer2.close()
+    val records = pollUntilExactlyNumRecords(consumer, 2)
+    records.foreach { record =>
+      TestUtils.assertCommittedAndGetValue(record)
     }
   }
 
@@ -388,10 +374,10 @@ class TransactionsTest extends KafkaServerTestHarness {
       groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
   }
 
-  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int): Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
     val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
     TestUtils.waitUntilTrue(() => {
-      records ++= consumer.poll(50)
+      records ++= consumer.poll(50).asScala
       records.size == numRecords
     }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
     records