You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/17 23:21:55 UTC

kafka git commit: KAFKA-5188; Integration tests for transactions

Repository: kafka
Updated Branches:
  refs/heads/trunk 2181ae768 -> b3a33ce4b


KAFKA-5188; Integration tests for transactions

Author: Apurva Mehta <ap...@confluent.io>
Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #2994 from apurvam/KAFKA-5188-exactly-once-integration-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b3a33ce4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b3a33ce4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b3a33ce4

Branch: refs/heads/trunk
Commit: b3a33ce4b81a20ae5635cf28490fd2e1f9d86141
Parents: 2181ae7
Author: Apurva Mehta <ap...@confluent.io>
Authored: Wed May 17 16:20:33 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 17 16:20:33 2017 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  59 ++-
 .../producer/internals/TransactionManager.java  |  25 +-
 .../kafka/common/requests/EndTxnRequest.java    |   4 +
 .../internals/TransactionManagerTest.java       |  47 ++-
 .../transaction/TransactionCoordinator.scala    |  24 +-
 .../transaction/TransactionMetadata.scala       |   2 +-
 .../transaction/TransactionStateManager.scala   |   1 -
 .../kafka/api/TransactionsBounceTest.scala      | 185 +++++++++
 .../kafka/api/TransactionsTest.scala            | 400 +++++++++++++++++++
 .../test/scala/unit/kafka/utils/TestUtils.scala |  95 ++++-
 10 files changed, 806 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 209a979..7180171 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.RequestCompletionHandler;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -39,7 +40,6 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
-import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.InitProducerIdRequest;
@@ -187,10 +187,12 @@ public class Sender implements Runnable {
      * @param now The current POSIX time in milliseconds
      */
     void run(long now) {
-        long pollTimeout = 0;
-        if (!maybeSendTransactionalRequest(now))
+        long pollTimeout = retryBackoffMs;
+        if (!maybeSendTransactionalRequest(now)) {
             pollTimeout = sendProducerData(now);
+        }
 
+        log.trace("waiting {}ms in poll", pollTimeout);
         this.client.poll(pollTimeout, now);
     }
 
@@ -203,6 +205,7 @@ public class Sender implements Runnable {
             final KafkaException exception = transactionManager.lastError() instanceof KafkaException
                     ? (KafkaException) transactionManager.lastError()
                     : new KafkaException(transactionManager.lastError());
+            log.error("aborting producer batches because the transaction manager is in an error state.", exception);
             this.accumulator.abortBatches(exception);
             return Long.MAX_VALUE;
         }
@@ -281,25 +284,35 @@ public class Sender implements Runnable {
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
-        if (transactionManager != null && transactionManager.hasInflightRequest())
+        if (transactionManager == null || !transactionManager.isTransactional())
+            return false;
+
+        if (transactionManager.hasInflightRequest()) {
+            log.trace("TransactionalId: {} -- There is already an inflight transactional request. Going to wait for the response.",
+                    transactionManager.transactionalId());
             return true;
+        }
 
-        if (transactionManager == null || !transactionManager.hasPendingTransactionalRequests())
+        if (!transactionManager.hasPendingTransactionalRequests()) {
+            log.trace("TransactionalId: {} -- There are no pending transactional requests to send", transactionManager.transactionalId());
             return false;
+        }
 
         TransactionManager.TxnRequestHandler nextRequestHandler = transactionManager.nextRequestHandler();
 
-        if (nextRequestHandler.isEndTxn()) {
-            if (transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
-                if (!accumulator.flushInProgress())
-                    accumulator.beginFlush();
-                transactionManager.reenqueue(nextRequestHandler);
-                return false;
-            } else if (transactionManager.isInErrorState()) {
-                nextRequestHandler.fatal(new KafkaException("Cannot commit transaction when there are " +
-                        "request errors. Please check your logs for the details of the errors encountered."));
-                return false;
-            }
+        if (nextRequestHandler.isEndTxn() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+            if (!accumulator.flushInProgress())
+                accumulator.beginFlush();
+            transactionManager.reenqueue(nextRequestHandler);
+            log.trace("TransactionalId: {} -- Going to wait for pending ProducerBatches to flush before sending an " +
+                    "end transaction request", transactionManager.transactionalId());
+            return false;
+        }
+
+        if (transactionManager.maybeTerminateRequestWithError(nextRequestHandler)) {
+            log.trace("TransactionalId: {} -- Not sending a transactional request because we are in an error state",
+                    transactionManager.transactionalId());
+            return false;
         }
 
         Node targetNode = null;
@@ -314,7 +327,6 @@ public class Sender implements Runnable {
                     }
                     if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) {
                         transactionManager.lookupCoordinator(nextRequestHandler);
-                        targetNode = null;
                         break;
                     }
                 } else {
@@ -322,23 +334,30 @@ public class Sender implements Runnable {
                 }
                 if (targetNode != null) {
                     if (nextRequestHandler.isRetry()) {
+                        log.trace("TransactionalId: {} -- Waiting {}ms before resending a transactional request {}",
+                                transactionManager.transactionalId(), retryBackoffMs, nextRequestHandler.requestBuilder());
                         time.sleep(retryBackoffMs);
                     }
                     ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequestHandler.requestBuilder(),
                             now, true, nextRequestHandler);
                     transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
+                    log.trace("TransactionalId: {} -- Sending transactional request {} to node {}", transactionManager.transactionalId(),
+                            nextRequestHandler.requestBuilder(), clientRequest.destination());
                     client.send(clientRequest, now);
                     return true;
                 }
             } catch (IOException e) {
-                log.warn("Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", e);
+                targetNode = null;
+                log.warn("TransactionalId: " + transactionManager.transactionalId() + " -- Got an exception when trying " +
+                        "to find a node to send transactional request " + nextRequestHandler.requestBuilder() + ". Going to back off and retry", e);
             }
+            log.trace("TransactionalId: {}. About to wait for {}ms before trying to send another transactional request.",
+                    transactionManager.transactionalId(), retryBackoffMs);
             time.sleep(retryBackoffMs);
             metadata.requestUpdate();
         }
 
-        if (targetNode == null)
-            transactionManager.retry(nextRequestHandler);
+        transactionManager.retry(nextRequestHandler);
 
         return true;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 551a75a..55c1782 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -95,7 +95,8 @@ public class TransactionManager {
                 case INITIALIZING:
                     return source == UNINITIALIZED || source == ERROR;
                 case READY:
-                    return source == INITIALIZING || source == COMMITTING_TRANSACTION || source == ABORTING_TRANSACTION;
+                    return source == INITIALIZING || source == COMMITTING_TRANSACTION
+                            || source == ABORTING_TRANSACTION || source == ERROR;
                 case IN_TRANSACTION:
                     return source == READY;
                 case COMMITTING_TRANSACTION:
@@ -246,7 +247,7 @@ public class TransactionManager {
     }
 
     public boolean isInErrorState() {
-        return currentState == State.ERROR;
+        return currentState == State.ERROR || currentState == State.FENCED;
     }
 
     public synchronized void setError(Exception exception) {
@@ -256,6 +257,23 @@ public class TransactionManager {
             transitionTo(State.ERROR, exception);
     }
 
+    boolean maybeTerminateRequestWithError(TxnRequestHandler requestHandler) {
+        if (isInErrorState() && requestHandler.isEndTxn()) {
+            // We shouldn't terminate abort requests from error states.
+            EndTxnHandler endTxnHandler = (EndTxnHandler) requestHandler;
+            if (endTxnHandler.builder.result() == TransactionResult.ABORT)
+                return false;
+            String errorMessage = "Cannot commit transaction because at least one previous transactional request " +
+                    "was not completed successfully.";
+            if (lastError != null)
+                requestHandler.fatal(new KafkaException(errorMessage, lastError));
+            else
+                requestHandler.fatal(new KafkaException(errorMessage));
+            return true;
+        }
+        return false;
+    }
+
     /**
      * Get the current producer id and epoch without blocking. Callers must use {@link ProducerIdAndEpoch#isValid()} to
      * verify that the result is valid.
@@ -484,6 +502,7 @@ public class TransactionManager {
                             "transactional.id has been started: producerId: {}. epoch: {}.",
                     producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
             result.setError(Errors.INVALID_PRODUCER_EPOCH.exception());
+            lastError = Errors.INVALID_PRODUCER_EPOCH.exception();
             transitionTo(State.FENCED, Errors.INVALID_PRODUCER_EPOCH.exception());
             result.done();
         }
@@ -501,10 +520,12 @@ public class TransactionManager {
             } else {
                 clearInFlightRequestCorrelationId();
                 if (response.wasDisconnected()) {
+                    log.trace("disconnected from " + response.destination() + ". Will retry.");
                     reenqueue();
                 } else if (response.versionMismatch() != null) {
                     fatal(response.versionMismatch());
                 } else if (response.hasResponse()) {
+                    log.trace("Got transactional response for request:" + requestBuilder());
                     handleResponse(response.responseBody());
                 } else {
                     fatal(new KafkaException("Could not execute transactional request for unknown reasons"));

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
index ff9b82c..77ec137 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EndTxnRequest.java
@@ -42,6 +42,10 @@ public class EndTxnRequest extends AbstractRequest {
             this.result = result;
         }
 
+        public TransactionResult result() {
+            return result;
+        }
+
         @Override
         public EndTxnRequest build(short version) {
             return new EndTxnRequest(version, transactionalId, producerId, producerEpoch, result);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 4db0452..6a35061 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -433,7 +433,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(tp0);
 
         Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
-                "value".getBytes(), Record.EMPTY_HEADERS, null, MAX_BLOCK_TIMEOUT).future;
+                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
 
         assertFalse(responseFuture.isDone());
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
@@ -442,6 +442,8 @@ public class TransactionManagerTest {
 
         sender.run(time.milliseconds());  // send produce.
 
+        assertTrue(responseFuture.isDone());
+        assertTrue(transactionManager.isInErrorState());
         responseFuture.get();
     }
 
@@ -500,6 +502,48 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testAllowAbortOnProduceFailure() throws InterruptedException {
+        client.setNode(brokerNode);
+        // This is called from the initTransactions method in the producer as the first order of business.
+        // It finds the coordinator and then gets a PID.
+        final long pid = 13131L;
+        final short epoch = 1;
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, FindCoordinatorRequest.CoordinatorType.TRANSACTION, transactionalId);
+
+        sender.run(time.milliseconds());  // find coordinator
+        sender.run(time.milliseconds());
+
+        prepareInitPidResponse(Errors.NONE, false, pid, epoch);
+        sender.run(time.milliseconds());  // get pid.
+
+        assertEquals(brokerNode, transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION));
+        assertTrue(transactionManager.hasProducerId());
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, new MockCallback(transactionManager), MAX_BLOCK_TIMEOUT).future;
+
+        TransactionalRequestResult abortResult = transactionManager.beginAbortingTransaction();
+        assertFalse(responseFuture.isDone());
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        prepareProduceResponse(Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, pid, epoch);
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, epoch);
+
+        sender.run(time.milliseconds());  // Send AddPartitionsRequest
+        assertFalse(abortResult.isCompleted());
+
+        sender.run(time.milliseconds());  // Send Produce Request, returns OutOfOrderSequenceException.
+        sender.run(time.milliseconds());  // try to abort
+        assertTrue(abortResult.isCompleted());
+        assertTrue(abortResult.isSuccessful());
+        assertTrue(transactionManager.isReadyForTransaction());  // make sure we are ready for a transaction now.
+    }
+
+
+    @Test
     public void shouldNotAddPartitionsToTransactionWhenTopicAuthorizationFailed() throws Exception {
         verifyAddPartitionsFailsWithPartitionLevelError(Errors.TOPIC_AUTHORIZATION_FAILED);
     }
@@ -519,6 +563,7 @@ public class TransactionManagerTest {
 
         final long pid = 1L;
         final short epoch = 1;
+
         prepareInitPidResponse(Errors.NONE, false, pid, epoch);
 
         sender.run(time.milliseconds());  // get pid.

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 233f7d7..8148cb6 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -271,6 +271,14 @@ class TransactionCoordinator(brokerId: Int,
       txnMarkerChannelManager.removeMarkersForTxnTopicPartition(txnTopicPartitionId)
   }
 
+  private def logInvalidStateTransitionAndReturnError(transactionalId: String,
+                                                      transactionState: TransactionState,
+                                                      transactionResult: TransactionResult) = {
+    error(s"transactionalId: $transactionalId -- Current state is $transactionState, but received transaction " +
+      s"marker result: $transactionResult")
+    Left(Errors.INVALID_TXN_STATE)
+  }
+
   def handleEndTransaction(transactionalId: String,
                            producerId: Long,
                            producerEpoch: Short,
@@ -306,24 +314,24 @@ class TransactionCoordinator(brokerId: Int,
                 if (txnMarkerResult == TransactionResult.COMMIT)
                   Left(Errors.NONE)
                 else
-                  Left(Errors.INVALID_TXN_STATE)
+                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
               case CompleteAbort =>
                 if (txnMarkerResult == TransactionResult.ABORT)
                   Left(Errors.NONE)
                 else
-                  Left(Errors.INVALID_TXN_STATE)
+                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
               case PrepareCommit =>
                 if (txnMarkerResult == TransactionResult.COMMIT)
                   Left(Errors.CONCURRENT_TRANSACTIONS)
                 else
-                  Left(Errors.INVALID_TXN_STATE)
+                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
               case PrepareAbort =>
                 if (txnMarkerResult == TransactionResult.ABORT)
                   Left(Errors.CONCURRENT_TRANSACTIONS)
                 else
-                  Left(Errors.INVALID_TXN_STATE)
+                  logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
               case Empty =>
-                Left(Errors.INVALID_TXN_STATE)
+                logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
             }
           }
       }
@@ -349,15 +357,15 @@ class TransactionCoordinator(brokerId: Int,
                         Left(Errors.CONCURRENT_TRANSACTIONS)
                       else txnMetadata.state match {
                         case Empty| Ongoing | CompleteCommit | CompleteAbort =>
-                          Left(Errors.INVALID_TXN_STATE)
+                          logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                         case PrepareCommit =>
                           if (txnMarkerResult != TransactionResult.COMMIT)
-                            Left(Errors.INVALID_TXN_STATE)
+                            logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                           else
                             Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
                         case PrepareAbort =>
                           if (txnMarkerResult != TransactionResult.ABORT)
-                            Left(Errors.INVALID_TXN_STATE)
+                            logInvalidStateTransitionAndReturnError(transactionalId, txnMetadata.state, txnMarkerResult)
                           else
                             Right(txnMetadata, txnMetadata.prepareComplete(time.milliseconds()))
                       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
index d05676b..0d176aa 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
@@ -135,7 +135,7 @@ private[transaction] class TransactionMetadata(val producerId: Long,
   }
 
   def removePartition(topicPartition: TopicPartition): Unit = {
-    if (pendingState.isDefined || (state != PrepareCommit && state != PrepareAbort))
+    if (pendingState.isDefined && (state != PrepareCommit && state != PrepareAbort))
       throw new IllegalStateException(s"Transation metadata's current state is $state, and its pending state is $state " +
         s"while trying to remove partitions whose txn marker has been sent, this is not expected")
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index cf41fc3..2327213 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -155,7 +155,6 @@ class TransactionStateManager(brokerId: Int,
     props.put(LogConfig.UncleanLeaderElectionEnableProp, "false")
     props.put(LogConfig.CompressionTypeProp, UncompressedCodec.name)
     props.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-
     props.put(LogConfig.MinInSyncReplicasProp, config.transactionLogMinInsyncReplicas.toString)
     props.put(LogConfig.SegmentBytesProp, config.transactionLogSegmentBytes.toString)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
new file mode 100644
index 0000000..f1fd365
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/TransactionsBounceTest.scala
@@ -0,0 +1,185 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.{ShutdownableThread, TestUtils}
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.apache.kafka.clients.producer.internals.ErrorLoggingCallback
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{Ignore, Test}
+
+import scala.collection.JavaConversions._
+import org.junit.Assert._
+
+
+class TransactionsBounceTest extends KafkaServerTestHarness {
+  private val producerBufferSize =  65536
+  private val serverMessageMaxBytes =  producerBufferSize/2
+  private val numPartitions = 3
+
+  val numServers = 4
+  private val outputTopic = "output-topic"
+  private val inputTopic = "input-topic"
+
+  val overridingProps = new Properties()
+  overridingProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.MessageMaxBytesProp, serverMessageMaxBytes.toString)
+  // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+  // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+  overridingProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+  overridingProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+  overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+  overridingProps.put(KafkaConfig.OffsetsTopicReplicationFactorProp, 3.toString)
+  overridingProps.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+  overridingProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 1.toString)
+  overridingProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 3.toString)
+  overridingProps.put(KafkaConfig.GroupMinSessionTimeoutMsProp, "10") // set small enough session timeout
+  overridingProps.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0")
+
+
+  // This is the one of the few tests we currently allow to preallocate ports, despite the fact that this can result in transient
+  // failures due to ports getting reused. We can't use random ports because of bad behavior that can result from bouncing
+  // brokers too quickly when they get new, random ports. If we're not careful, the client can end up in a situation
+  // where metadata is not refreshed quickly enough, and by the time it's actually trying to, all the servers have
+  // been bounced and have new addresses. None of the bootstrap nodes or current metadata can get them connected to a
+  // running server.
+  //
+  // Since such quick rotation of servers is incredibly unrealistic, we allow this one test to preallocate ports, leaving
+  // a small risk of hitting errors due to port conflicts. Hopefully this is infrequent enough to not cause problems.
+  override def generateConfigs() = {
+    FixedPortTestUtils.createBrokerConfigs(numServers, zkConnect,enableControlledShutdown = true)
+      .map(KafkaConfig.fromProps(_, overridingProps))
+  }
+
+  @Ignore  // need to fix KAFKA-5268 and KAFKA-5269 before re-enabling
+  @Test
+  def testBrokerFailure() {
+    // basic idea is to seed a topic with 10000 records, and copy it transactionally while bouncing brokers
+    // constantly through the period.
+    val consumerGroup=  "myGroup"
+    val numInputRecords = 5000
+    createTopics()
+
+    TestUtils.seedTopicWithNumberedRecords(inputTopic, numInputRecords, servers)
+
+    var consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+    val producer = TestUtils.createTransactionalProducer("my-test-producer-t.id", servers)
+
+    val scheduler = new BounceScheduler
+    producer.initTransactions()
+    scheduler.start()
+
+    var numMessagesProcessed = 0
+    var iteration = 0
+    try {
+      while (numMessagesProcessed < numInputRecords) {
+        val toRead = Math.min(200, numInputRecords - numMessagesProcessed)
+        trace(s"$iteration: About to read $toRead messages, processed $numMessagesProcessed so far..")
+        val records = TestUtils.pollUntilAtLeastNumRecords(consumer, toRead)
+        trace(s"received ${records.size} messages. sending them transactionally to $outputTopic")
+        producer.beginTransaction()
+        val shouldAbort = iteration % 10 == 0
+        records.zipWithIndex.foreach { case (record, i) =>
+          producer.send(
+            TestUtils.producerRecordWithExpectedTransactionStatus(outputTopic, record.key, record.value, !shouldAbort),
+            new ErrorLoggingCallback(outputTopic, record.key, record.value, true))
+        }
+        trace(s"Sent ${records.size} messages. Committing offsets.")
+        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroup)
+        if (shouldAbort) {
+          trace(s"Committed offsets. Aborting transaction of ${records.size} messages.")
+          producer.abortTransaction()
+          consumer.close()
+          consumer = createConsumerAndSubscribeToTopics(consumerGroup, List(inputTopic))
+        } else {
+          trace(s"Committed offsets. committing transaction of ${records.size} messages.")
+          producer.commitTransaction()
+          numMessagesProcessed += records.size
+        }
+        iteration += 1
+      }
+    } finally {
+      producer.close()
+      consumer.close()
+    }
+
+    scheduler.shutdown()
+
+    val verifyingConsumer = createConsumerAndSubscribeToTopics("randoGroup", List(outputTopic), readCommitted = true)
+    val outputRecords = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numInputRecords).map { case(record) =>
+      TestUtils.assertCommittedAndGetValue(record).toInt
+    }
+    val recordSet = outputRecords.toSet
+    assertEquals(numInputRecords, recordSet.size)
+
+    val expectedValues = (0 until numInputRecords).toSet
+    assertEquals(s"Missing messages: ${expectedValues -- recordSet}", expectedValues, recordSet)
+
+    verifyingConsumer.close()
+  }
+
+  private def createConsumerAndSubscribeToTopics(groupId: String, topics: List[String], readCommitted: Boolean = false) = {
+    val props = new Properties()
+    if (readCommitted)
+      props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "200")
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000")
+    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000")
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
+
+    val consumer = TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers), groupId = groupId,
+      securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+    consumer.subscribe(topics)
+    consumer
+  }
+
+  private def createTopics() =  {
+    val topicConfig = new Properties()
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+    TestUtils.createTopic(zkUtils, inputTopic, numPartitions, numServers, servers, topicConfig)
+    TestUtils.createTopic(zkUtils, outputTopic, numPartitions, numServers, servers, topicConfig)
+  }
+
+  private class BounceScheduler extends ShutdownableThread("daemon-broker-bouncer", false) {
+    override def doWork(): Unit = {
+      for (server <- servers) {
+        trace("Shutting down server : %s".format(server.config.brokerId))
+        server.shutdown()
+        server.awaitShutdown()
+        Thread.sleep(500)
+        trace("Server %s shut down. Starting it up again.".format(server.config.brokerId))
+        server.startup()
+        trace("Restarted server: %s".format(server.config.brokerId))
+        Thread.sleep(500)
+      }
+
+      (0 until numPartitions).foreach(partition => TestUtils.waitUntilLeaderIsElectedOrChanged(zkUtils, outputTopic, partition))
+    }
+
+    override def shutdown(){
+      super.shutdown()
+   }
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
new file mode 100644
index 0000000..3e19bb9
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -0,0 +1,400 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.api
+
+import java.util.Properties
+
+import kafka.integration.KafkaServerTestHarness
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetAndMetadata}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.ProducerFencedException
+import org.apache.kafka.common.protocol.SecurityProtocol
+import org.junit.{After, Before, Ignore, Test}
+import org.junit.Assert._
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.ExecutionException
+import scala.util.Random
+
+class TransactionsTest extends KafkaServerTestHarness {
+  val numServers = 3
+  val topic1 = "topic1"
+  val topic2 = "topic2"
+
+
+  override def generateConfigs : Seq[KafkaConfig] = {
+    TestUtils.createBrokerConfigs(numServers, zkConnect, true).map(KafkaConfig.fromProps(_, serverProps()))
+  }
+
+  @Before
+  override def setUp(): Unit = {
+    super.setUp()
+    val numPartitions = 3
+    val topicConfig = new Properties()
+    topicConfig.put(KafkaConfig.MinInSyncReplicasProp, 2.toString)
+    TestUtils.createTopic(zkUtils, topic1, numPartitions, numServers, servers, topicConfig)
+    TestUtils.createTopic(zkUtils, topic2, numPartitions, numServers, servers, topicConfig)
+  }
+
+  @After
+  override def tearDown(): Unit = {
+    super.tearDown()
+  }
+
+  @Test
+  def testBasicTransactions() = {
+    val producer = TestUtils.createTransactionalProducer("my-hello-world-transactional-id", servers)
+    val consumer = transactionalConsumer("transactional-group")
+    val unCommittedConsumer = nonTransactionalConsumer("non-transactional-group")
+    try {
+      producer.initTransactions()
+
+      producer.beginTransaction()
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "2", willBeCommitted = false))
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "4", "4", willBeCommitted = false))
+      producer.abortTransaction()
+
+      producer.beginTransaction()
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = true))
+      producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = true))
+      producer.commitTransaction()
+
+      consumer.subscribe(List(topic1, topic2))
+      unCommittedConsumer.subscribe(List(topic1, topic2))
+
+      val records = pollUntilExactlyNumRecords(consumer, 2)
+      records.zipWithIndex.foreach { case (record, i) =>
+        TestUtils.assertCommittedAndGetValue(record)
+      }
+
+      val allRecords = pollUntilExactlyNumRecords(unCommittedConsumer, 4)
+      val expectedValues = List("1", "2", "3", "4").toSet
+      allRecords.zipWithIndex.foreach { case (record, i) =>
+        assertTrue(expectedValues.contains(TestUtils.recordValueAsString(record)))
+      }
+    } finally {
+      consumer.close()
+      producer.close()
+      unCommittedConsumer.close()
+    }
+  }
+
+  @Test
+  def testSendOffsets() = {
+    // The basic plan for the test is as follows:
+    //  1. Seed topic1 with 1000 unique, numbered, messages.
+    //  2. Run a consume/process/produce loop to transactionally copy messages from topic1 to topic2 and commit
+    //     offsets as part of the transaction.
+    //  3. Randomly abort transactions in step2.
+    //  4. Validate that we have 1000 unique committed messages in topic2. If the offsets were committed properly with the
+    //     transactions, we should not have any duplicates or missing messages since we should process in the input
+    //     messages exactly once.
+
+    val transactionalId = "foobar-id"
+    val consumerGroupId = "foobar-consumer-group"
+    val numSeedMessages = 500
+
+    TestUtils.seedTopicWithNumberedRecords(topic1, numSeedMessages, servers)
+
+    val producer = TestUtils.createTransactionalProducer(transactionalId, servers)
+
+    var consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+    consumer.subscribe(List(topic1))
+    producer.initTransactions()
+
+    val random = new Random()
+    var shouldCommit = false
+    var recordsProcessed = 0
+    try {
+      while (recordsProcessed < numSeedMessages) {
+        producer.beginTransaction()
+        shouldCommit = !shouldCommit
+
+        val records = TestUtils.pollUntilAtLeastNumRecords(consumer, Math.min(10, numSeedMessages - recordsProcessed))
+        records.zipWithIndex.foreach { case (record, i) =>
+          val key = new String(record.key(), "UTF-8")
+          val value = new String(record.value(), "UTF-8")
+          producer.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, key, value, willBeCommitted = shouldCommit))
+        }
+
+        producer.sendOffsetsToTransaction(TestUtils.consumerPositions(consumer), consumerGroupId)
+        if (shouldCommit) {
+          producer.commitTransaction()
+          recordsProcessed += records.size
+          debug(s"committed transaction.. Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " +
+            s"records written to $topic2: $recordsProcessed")
+        } else {
+          producer.abortTransaction()
+          debug(s"aborted transaction Last committed record: ${new String(records.last.value(), "UTF-8")}. Num " +
+            s"records written to $topic2: $recordsProcessed")
+          consumer.close()
+          consumer = transactionalConsumer(consumerGroupId, maxPollRecords = numSeedMessages / 4)
+          consumer.subscribe(List(topic1))
+        }
+      }
+    } finally {
+      producer.close()
+      consumer.close()
+    }
+
+    // Inspite of random aborts, we should still have exactly 1000 messages in topic2. Ie. we should not
+    // re-copy or miss any messages from topic1, since the consumed offsets were committed transactionally.
+    val verifyingConsumer = transactionalConsumer("foobargroup")
+    verifyingConsumer.subscribe(List(topic2))
+    val valueSeq = TestUtils.pollUntilAtLeastNumRecords(verifyingConsumer, numSeedMessages).map { record =>
+      TestUtils.assertCommittedAndGetValue(record).toInt
+    }
+    verifyingConsumer.close()
+    val valueSet = valueSeq.toSet
+    assertEquals(s"Expected $numSeedMessages values in $topic2.", numSeedMessages, valueSeq.size)
+    assertEquals(s"Expected ${valueSeq.size} unique messages in $topic2.", valueSeq.size, valueSet.size)
+  }
+
+  @Test
+  def testFencingOnCommit() = {
+    val transactionalId = "my-t.id"
+    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val consumer = transactionalConsumer()
+    consumer.subscribe(List(topic1, topic2))
+
+    try {
+      producer1.initTransactions()
+
+      producer1.beginTransaction()
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+      producer2.initTransactions()  // ok, will abort the open transaction.
+      producer2.beginTransaction()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+      try {
+        producer1.commitTransaction()
+        fail("Should not be able to commit transactions from a fenced producer.")
+      } catch {
+        case e : ProducerFencedException =>
+          // good!
+          producer1.close()
+        case e : Exception =>
+          fail("Got an unexpected exception from a fenced producer.", e)
+      }
+
+      producer2.commitTransaction()  // ok
+
+      val records = pollUntilExactlyNumRecords(consumer, 2)
+      records.zipWithIndex.foreach { case (record, i) =>
+        TestUtils.assertCommittedAndGetValue(record)
+      }
+    } finally {
+      consumer.close()
+      producer2.close()
+    }
+  }
+
+  @Test
+  def testFencingOnSendOffsets() = {
+    val transactionalId = "my-t.id"
+    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val consumer = transactionalConsumer()
+    consumer.subscribe(List(topic1, topic2))
+
+    try {
+      producer1.initTransactions()
+
+      producer1.beginTransaction()
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+      producer2.initTransactions()  // ok, will abort the open transaction.
+      producer2.beginTransaction()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true))
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true))
+
+      try {
+        producer1.sendOffsetsToTransaction(Map(new TopicPartition("foobartopic", 0) -> new OffsetAndMetadata(110L)),  "foobarGroup")
+        fail("Should not be able to send offsets from a fenced producer.")
+      } catch {
+        case e : ProducerFencedException =>
+          // good!
+          producer1.close()
+        case e : Exception =>
+          fail("Got an unexpected exception from a fenced producer.", e)
+      }
+
+      producer2.commitTransaction()  // ok
+
+      val records = pollUntilExactlyNumRecords(consumer, 2)
+      records.zipWithIndex.foreach { case (record, i) =>
+        TestUtils.assertCommittedAndGetValue(record)
+      }
+    } finally {
+      consumer.close()
+      producer2.close()
+    }
+  }
+
+  @Ignore @Test
+  def testFencingOnSend() {
+    val transactionalId = "my-t.id"
+    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val consumer = transactionalConsumer()
+    consumer.subscribe(List(topic1, topic2))
+
+    try {
+      producer1.initTransactions()
+
+      producer1.beginTransaction()
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+
+      producer2.initTransactions()  // ok, will abort the open transaction.
+      producer2.beginTransaction()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+      try {
+        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+        val recordMetadata = result.get()
+        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
+        servers.foreach { case (server) =>
+          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+        }
+        fail("Should not be able to send messages from a fenced producer.")
+      } catch {
+        case e : ProducerFencedException =>
+          producer1.close()
+        case e : ExecutionException =>
+          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+          producer1.close()
+        case e : Exception =>
+          fail("Got an unexpected exception from a fenced producer.", e)
+      }
+
+      producer2.commitTransaction()  // ok
+
+      val records = pollUntilExactlyNumRecords(consumer, 2)
+      records.zipWithIndex.foreach { case (record, i) =>
+        TestUtils.assertCommittedAndGetValue(record)
+      }
+    } finally {
+      consumer.close()
+      producer2.close()
+    }
+  }
+
+  @Test
+  def testFencingOnAddPartitions(): Unit = {
+    val transactionalId = "my-t.id"
+    val producer1 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val producer2 = TestUtils.createTransactionalProducer(transactionalId, servers)
+    val consumer = transactionalConsumer()
+    consumer.subscribe(List(topic1, topic2))
+
+    try {
+      producer1.initTransactions()
+
+      producer1.beginTransaction()
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "1", willBeCommitted = false))
+      producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "3", "3", willBeCommitted = false))
+      producer1.abortTransaction()
+
+      producer2.initTransactions()  // ok, will abort the open transaction.
+      producer2.beginTransaction()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "2", "4", willBeCommitted = true)).get()
+      producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, "2", "4", willBeCommitted = true)).get()
+
+      try {
+        producer1.beginTransaction()
+        val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, "1", "5", willBeCommitted = false))
+        val recordMetadata = result.get()
+        error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic()}-${recordMetadata.partition()}. Grab the logs!!")
+        servers.foreach { case (server) =>
+          error(s"log dirs: ${server.logManager.logDirs.map(_.getAbsolutePath).head}")
+        }
+        fail("Should not be able to send messages from a fenced producer.")
+      } catch {
+        case e : ProducerFencedException =>
+          producer1.close()
+        case e : ExecutionException =>
+          assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+          producer1.close()
+        case e : Exception =>
+          fail("Got an unexpected exception from a fenced producer.", e)
+      }
+
+      producer2.commitTransaction()  // ok
+
+      val records = pollUntilExactlyNumRecords(consumer, 2)
+      records.zipWithIndex.foreach { case (record, i) =>
+        TestUtils.assertCommittedAndGetValue(record)
+      }
+    } finally {
+      consumer.close()
+      producer2.close()
+    }
+  }
+
+  private def serverProps() = {
+    val serverProps = new Properties()
+    serverProps.put(KafkaConfig.AutoCreateTopicsEnableProp, false.toString)
+    // Set a smaller value for the number of partitions for the offset commit topic (__consumer_offset topic)
+    // so that the creation of that topic/partition(s) and subsequent leader assignment doesn't take relatively long
+    serverProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
+    serverProps.put(KafkaConfig.TransactionsTopicPartitionsProp, 3.toString)
+    serverProps.put(KafkaConfig.TransactionsTopicReplicationFactorProp, 2.toString)
+    serverProps.put(KafkaConfig.TransactionsTopicMinISRProp, 2.toString)
+    serverProps.put(KafkaConfig.ControlledShutdownEnableProp, true.toString)
+    serverProps.put(KafkaConfig.UncleanLeaderElectionEnableProp, false.toString)
+    serverProps.put(KafkaConfig.AutoLeaderRebalanceEnableProp, false.toString)
+    serverProps
+  }
+
+  private def transactionalConsumer(group: String = "group", maxPollRecords: Int = 500) = {
+    val props = new Properties()
+    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed")
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords.toString)
+    TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+      groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+  }
+
+  private def nonTransactionalConsumer(group: String = "group") = {
+    val props = new Properties()
+    props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_uncommitted")
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+    TestUtils.createNewConsumer(TestUtils.getBrokerListStrFromServers(servers),
+      groupId = group, securityProtocol = SecurityProtocol.PLAINTEXT, props = Some(props))
+  }
+
+  private def pollUntilExactlyNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+    val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    TestUtils.waitUntilTrue(() => {
+      records ++= consumer.poll(50)
+      records.size == numRecords
+    }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
+    records
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/b3a33ce4/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 6bee18d..90dcacd 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -20,7 +20,7 @@ package kafka.utils
 import java.io._
 import java.nio._
 import java.nio.channels._
-import java.nio.charset.Charset
+import java.nio.charset.{Charset, StandardCharsets}
 import java.security.cert.X509Certificate
 import java.util.{ArrayList, Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
@@ -40,9 +40,10 @@ import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils.ZkUtils._
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, RangeAssignor}
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer, OffsetAndMetadata, RangeAssignor}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.header.Header
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.protocol.SecurityProtocol
@@ -55,8 +56,9 @@ import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
 import org.junit.Assert._
 
+import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
-import scala.collection.Map
+import scala.collection.{Map, mutable}
 import scala.collection.mutable.{ArrayBuffer, ListBuffer}
 import scala.util.Try
 
@@ -75,6 +77,10 @@ object TestUtils extends Logging {
   /** Zookeeper connection string to use for unit tests that mock/don't require a real ZK server. */
   val MockZkConnect = "127.0.0.1:" + MockZkPort
 
+  private val transactionStatusKey = "transactionStatus"
+  private val committedValue : Array[Byte] = "committed".getBytes(StandardCharsets.UTF_8)
+  private val abortedValue : Array[Byte] = "aborted".getBytes(StandardCharsets.UTF_8)
+
   /**
    * Create a temporary directory
    */
@@ -1330,6 +1336,89 @@ object TestUtils extends Logging {
     assertEquals("Consumed more records than expected", numMessages, records.size)
     records
   }
+
+  def createTransactionalProducer(transactionalId: String, servers: Seq[KafkaServer]) = {
+    val props = new Properties()
+    props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId)
+    props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
+    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+  }
+
+  // Seeds the given topic with records with keys and values in the range [0..numRecords)
+  def seedTopicWithNumberedRecords(topic: String, numRecords: Int, servers: Seq[KafkaServer]): Int = {
+    val props = new Properties()
+    props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true")
+    var recordsWritten = 0
+    val producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), retries = Integer.MAX_VALUE, acks = -1, props = Some(props))
+    try {
+      for (i <- 0 until numRecords) {
+        producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, asBytes(i.toString), asBytes(i.toString)))
+        recordsWritten += 1
+      }
+      producer.flush()
+    } finally {
+      producer.close()
+    }
+    recordsWritten
+  }
+
+  private def asString(bytes: Array[Byte]) = new String(bytes, StandardCharsets.UTF_8)
+
+  private def asBytes(string: String) = string.getBytes(StandardCharsets.UTF_8)
+
+  // Verifies that the record was intended to be committed by checking the the headers for an expected transaction status
+  // If true, this will return the value as a string. It is expected that the record in question should have been created
+  // by the `producerRecordWithExpectedTransactionStatus` method.
+  def assertCommittedAndGetValue(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {
+    record.headers.headers(transactionStatusKey).headOption match {
+      case Some(header) =>
+        assertEquals(s"Got ${asString(header.value)} but expected the value to indicate " +
+          s"committed status.", asString(committedValue), asString(header.value))
+      case None =>
+        fail("expected the record header to include an expected transaction status, but received nothing.")
+    }
+    recordValueAsString(record)
+  }
+
+  def recordValueAsString(record: ConsumerRecord[Array[Byte], Array[Byte]]) : String = {
+    asString(record.value)
+  }
+
+  def producerRecordWithExpectedTransactionStatus(topic: String, key: Array[Byte], value: Array[Byte],
+                                                  willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = {
+    val header = new Header {override def key() = transactionStatusKey
+      override def value() = if (willBeCommitted)
+        committedValue
+      else
+        abortedValue
+    }
+    new ProducerRecord[Array[Byte], Array[Byte]](topic, null, key, value, List(header))
+  }
+
+  def producerRecordWithExpectedTransactionStatus(topic: String, key: String, value: String,
+                                                  willBeCommitted: Boolean) : ProducerRecord[Array[Byte], Array[Byte]] = {
+    producerRecordWithExpectedTransactionStatus(topic, asBytes(key), asBytes(value), willBeCommitted)
+  }
+
+  // Collect the current positions for all partition in the consumers current assignment.
+  def consumerPositions(consumer: KafkaConsumer[Array[Byte], Array[Byte]]) : Map[TopicPartition, OffsetAndMetadata]  = {
+    val offsetsToCommit = new mutable.HashMap[TopicPartition, OffsetAndMetadata]()
+    consumer.assignment.foreach{ topicPartition =>
+      offsetsToCommit.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition)))
+    }
+    offsetsToCommit.toMap
+  }
+
+  def pollUntilAtLeastNumRecords(consumer: KafkaConsumer[Array[Byte], Array[Byte]], numRecords: Int) : Seq[ConsumerRecord[Array[Byte], Array[Byte]]] = {
+    val records = new ArrayBuffer[ConsumerRecord[Array[Byte], Array[Byte]]]()
+    TestUtils.waitUntilTrue(() => {
+      records ++= consumer.poll(50)
+      records.size >= numRecords
+    }, s"Consumed ${records.size} records until timeout, but expected $numRecords records.")
+    records
+  }
+
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {