You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/04/27 21:29:35 UTC

[3/3] kafka git commit: KAFKA-4818; Exactly once transactional clients

KAFKA-4818; Exactly once transactional clients

Author: Apurva Mehta <ap...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #2840 from apurvam/exactly-once-transactional-clients


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a82f194b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a82f194b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a82f194b

Branch: refs/heads/trunk
Commit: a82f194b21a6af2f52e36e55e2c6adcdba942c08
Parents: bb663d0
Author: Apurva Mehta <ap...@confluent.io>
Authored: Thu Apr 27 14:11:17 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Apr 27 14:11:36 2017 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 checkstyle/suppressions.xml                     |  44 +-
 .../kafka/clients/consumer/ConsumerConfig.java  |  20 +
 .../kafka/clients/consumer/KafkaConsumer.java   |   8 +-
 .../clients/consumer/internals/Fetcher.java     |  94 +-
 .../kafka/clients/producer/KafkaProducer.java   | 169 +++-
 .../kafka/clients/producer/MockProducer.java    |  23 +
 .../apache/kafka/clients/producer/Producer.java |  30 +-
 .../kafka/clients/producer/ProducerConfig.java  |  25 +-
 .../clients/producer/TransactionState.java      | 135 ---
 .../internals/FutureTransactionalResult.java    |  64 ++
 .../producer/internals/ProducerBatch.java       |   7 +-
 .../producer/internals/RecordAccumulator.java   |  32 +-
 .../clients/producer/internals/Sender.java      | 134 ++-
 .../producer/internals/TransactionManager.java  | 866 +++++++++++++++++++
 .../internals/TransactionalRequestResult.java   |  64 ++
 .../apache/kafka/common/config/ConfigDef.java   |  10 +
 .../kafka/common/record/MemoryRecords.java      |  32 +-
 .../common/record/MemoryRecordsBuilder.java     |  31 +-
 .../requests/AddOffsetsToTxnResponse.java       |   1 +
 .../requests/AddPartitionsToTxnResponse.java    |   1 +
 .../kafka/common/requests/EndTxnResponse.java   |   1 +
 .../kafka/common/requests/FetchRequest.java     |  21 +-
 .../kafka/common/requests/ProduceRequest.java   |  22 +-
 .../clients/consumer/KafkaConsumerTest.java     |   7 +-
 .../clients/consumer/internals/FetcherTest.java | 313 ++++++-
 .../internals/RecordAccumulatorTest.java        |   3 +-
 .../clients/producer/internals/SenderTest.java  |  49 +-
 .../internals/TransactionManagerTest.java       | 588 +++++++++++++
 .../internals/TransactionStateTest.java         |  61 --
 .../common/record/MemoryRecordsBuilderTest.java |   9 +-
 .../src/main/scala/kafka/log/LogValidator.scala |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala |   8 +-
 .../unit/kafka/server/FetchRequestTest.scala    |   6 +-
 gradle/findbugs-exclude.xml                     |   9 +
 35 files changed, 2515 insertions(+), 377 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3475062..a6de9a7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -124,6 +124,7 @@
     </subpackage>
 
     <subpackage name="producer">
+      <allow pkg="org.apache.kafka.clients.consumer" />
       <allow pkg="org.apache.kafka.clients.producer" />
     </subpackage>
   </subpackage>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 042be6b..eae8dde 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -8,7 +8,7 @@
 
     <!-- Clients -->
     <suppress checks="ClassFanOutComplexity"
-              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils).java"/>
+              files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils|TransactionManagerTest).java"/>
     <suppress checks="ClassFanOutComplexity"
               files=".*/protocol/Errors.java"/>
     <suppress checks="ClassFanOutComplexity"
@@ -35,7 +35,7 @@
               files="DefaultRecordBatch.java"/>
 
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse).java"/>
+              files="(KafkaConsumer|ConsumerCoordinator|Fetcher|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager).java"/>
     <suppress checks="ClassDataAbstractionCoupling"
               files=".*/protocol/Errors.java"/>
 
@@ -43,51 +43,17 @@
               files="(Utils|KafkaLZ4BlockOutputStream).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="ConsumerCoordinator.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="Fetcher.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="KafkaProducer.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="BufferPool.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="RecordAccumulator.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="ConfigDef.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="Selector.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="SslTransportLayer.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="KerberosLogin.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="AbstractRequest.java"/>
-    <suppress checks="CyclomaticComplexity"
-              files="AbstractResponse.java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|SsLTransportLayer|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer).java"/>
 
     <suppress checks="JavaNCSS"
               files="KerberosLogin.java"/>
 
     <suppress checks="NPathComplexity"
-              files="BufferPool.java"/>
-    <suppress checks="NPathComplexity"
-              files="MetricName.java"/>
-    <suppress checks="NPathComplexity"
-              files="Node.java"/>
-    <suppress checks="NPathComplexity"
-              files="ConfigDef.java"/>
-    <suppress checks="NPathComplexity"
-              files="SslTransportLayer.java"/>
-    <suppress checks="NPathComplexity"
-              files="MetadataResponse.java"/>
-    <suppress checks="NPathComplexity"
-              files="KerberosLogin.java"/>
-    <suppress checks="NPathComplexity"
-              files="SslTransportLayer.java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|SslTransportLayer|Sender).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
-              files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse)Test.java"/>
+              files="(Sender|Fetcher|KafkaConsumer|Metrics|ConsumerCoordinator|RequestResponse|TransactionManager)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index abb8255..0c0c29d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -22,10 +22,12 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.serialization.Deserializer;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -234,6 +236,18 @@ public class ConsumerConfig extends AbstractConfig {
      */
     static final String LEAVE_GROUP_ON_CLOSE_CONFIG = "internal.leave.group.on.close";
 
+    /** <code>isolation.level</code> */
+    public static final String ISOLATION_LEVEL_CONFIG = "isolation.level";
+    public static final String ISOLATION_LEVEL_DOC = "<p>Controls how to read messages written transactionally. If set to <code>read_committed</code>, consumer.poll() will only return" +
+            " transactional messages which have been committed. If set to <code>read_uncommitted</code>' (the default), consumer.poll() will return all messages, even transactional messages" +
+            " which have been aborted. Non-transactional messages will be returned unconditionally in either mode.</p> <p>Messages will always be returned in offset order. Hence, in " +
+            " <code>read_committed</code> mode, consumer.poll() will only return messages up to the last stable offset (LSO), which is the one less than the offset of the first open transaction." +
+            " In particular any messages appearing after messages belonging to ongoing transactions will be withheld until the relevant transaction has been completed. As a result, <code>read_committed</code>" +
+            " consumers will not be able to read up to the high watermark when there are in flight transactions.</p><p> Further, when in <code>read_committed</mode> the seekToEnd method will" +
+            " return the LSO";
+
+    public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
+    
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -405,6 +419,12 @@ public class ConsumerConfig extends AbstractConfig {
                                                 Type.BOOLEAN,
                                                 true,
                                                 Importance.LOW)
+                                .define(ISOLATION_LEVEL_CONFIG,
+                                        Type.STRING,
+                                        DEFAULT_ISOLATION_LEVEL,
+                                        in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
+                                        Importance.MEDIUM,
+                                        ISOLATION_LEVEL_DOC)
                                 // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9c703b5..da5b7fb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.AppInfoParser;
@@ -660,6 +661,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
+
+            IsolationLevel isolationLevel = IsolationLevel.valueOf(
+                    config.getString(ConsumerConfig.ISOLATION_LEVEL_CONFIG).toUpperCase(Locale.ROOT));
+
             NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder),
                     this.metadata,
@@ -710,7 +715,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     metrics,
                     metricGrpPrefix,
                     this.time,
-                    this.retryBackoffMs);
+                    this.retryBackoffMs,
+                    isolationLevel);
 
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index f421dfb..947214f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.InvalidMetadataException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
@@ -44,12 +45,14 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.InvalidRecordException;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
+import org.apache.kafka.common.requests.IsolationLevel;
 import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
@@ -66,16 +69,20 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.Collections.emptyList;
+
 /**
  * This class manage the fetching process with the brokers.
  */
@@ -98,6 +105,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
     private final Deserializer<K> keyDeserializer;
     private final Deserializer<V> valueDeserializer;
+    private final IsolationLevel isolationLevel;
+
     private PartitionRecords nextInLineRecords = null;
     private ExceptionMetadata nextInLineExceptionMetadata = null;
 
@@ -115,7 +124,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                    Metrics metrics,
                    String metricGrpPrefix,
                    Time time,
-                   long retryBackoffMs) {
+                   long retryBackoffMs,
+                   IsolationLevel isolationLevel) {
         this.time = time;
         this.client = client;
         this.metadata = metadata;
@@ -131,6 +141,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         this.completedFetches = new ConcurrentLinkedQueue<>();
         this.sensors = new FetchManagerMetrics(metrics, metricGrpPrefix);
         this.retryBackoffMs = retryBackoffMs;
+        this.isolationLevel = isolationLevel;
 
         subscriptions.addListener(this);
     }
@@ -370,7 +381,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         if (strategy == OffsetResetStrategy.EARLIEST)
             timestamp = ListOffsetRequest.EARLIEST_TIMESTAMP;
         else if (strategy == OffsetResetStrategy.LATEST)
-            timestamp = ListOffsetRequest.LATEST_TIMESTAMP;
+            timestamp = endTimestamp();
         else
             throw new NoOffsetForPartitionException(partition);
         return timestamp;
@@ -457,7 +468,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
     }
 
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, long timeout) {
-        return beginningOrEndOffset(partitions, ListOffsetRequest.LATEST_TIMESTAMP, timeout);
+        return beginningOrEndOffset(partitions, endTimestamp(), timeout);
+    }
+
+    private long endTimestamp() {
+        return ListOffsetRequest.LATEST_TIMESTAMP;
     }
 
     private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> partitions,
@@ -570,7 +585,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
 
         partitionRecords.drain();
-        return Collections.emptyList();
+        return emptyList();
     }
 
     /**
@@ -771,7 +786,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         Map<Node, FetchRequest.Builder> requests = new HashMap<>();
         for (Map.Entry<Node, LinkedHashMap<TopicPartition, FetchRequest.PartitionData>> entry : fetchable.entrySet()) {
             Node node = entry.getKey();
-            FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, entry.getValue()).
+            FetchRequest.Builder fetch = FetchRequest.Builder.forConsumer(this.maxWaitMs, this.minBytes, entry.getValue(), isolationLevel).
                     setMaxBytes(this.maxBytes);
             requests.put(node, fetch);
         }
@@ -906,6 +921,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         private final TopicPartition partition;
         private final CompletedFetch completedFetch;
         private final Iterator<? extends RecordBatch> batches;
+        private final Set<Long> abortedProducerIds;
+        private final PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions;
 
         private int recordsRead;
         private int bytesRead;
@@ -921,6 +938,8 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             this.completedFetch = completedFetch;
             this.batches = batches;
             this.nextFetchOffset = completedFetch.fetchedOffset;
+            this.abortedProducerIds = new HashSet<>();
+            this.abortedTransactions = abortedTransactions(completedFetch.partitionData);
         }
 
         private void drain() {
@@ -960,8 +979,10 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
         }
 
         private void maybeCloseRecordStream() {
-            if (records != null)
+            if (records != null) {
                 records.close();
+                records = null;
+            }
         }
 
         private Record nextFetchedRecord() {
@@ -974,7 +995,14 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         return null;
                     }
                     currentBatch = batches.next();
+
                     maybeEnsureValid(currentBatch);
+
+                    if (isolationLevel == IsolationLevel.READ_COMMITTED && isBatchAborted(currentBatch)) {
+                        nextFetchOffset = currentBatch.lastOffset() + 1;
+                        continue;
+                    }
+
                     records = currentBatch.streamingIterator();
                 }
 
@@ -1008,6 +1036,60 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
             }
             return records;
         }
+
+        private boolean isBatchAborted(RecordBatch batch) {
+           /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
+            *   0. Check whether the pid is in the 'abortedProducerIds' set && the entry does not include an abort marker.
+            *      If so, skip the entry.
+            *   1. If the pid is in aborted pids and the entry contains an abort marker, remove the pid from
+            *      aborted pids and skip the entry.
+            *   2. Check lowest offset entry in the abort index. If the PID of the current entry matches the
+            *      pid of the abort index entry, and the incoming offset is no smaller than the abort index offset,
+            *      this means that the entry has been aborted. Add the pid to the aborted pids set, and remove
+            *      the entry from the abort index.
+            */
+            FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
+            if (abortedProducerIds.contains(batch.producerId())
+                    || (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset())) {
+                if (abortedProducerIds.contains(batch.producerId()) && containsAbortMarker(batch)) {
+                    abortedProducerIds.remove(batch.producerId());
+                } else if (nextAbortedTransaction != null && nextAbortedTransaction.producerId == batch.producerId() && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
+                    abortedProducerIds.add(batch.producerId());
+                    abortedTransactions.poll();
+                }
+                log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}", batch.producerId(), batch.baseOffset(), partition);
+                return true;
+            }
+            return false;
+        }
+
+        private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
+            PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions = null;
+            if (partition.abortedTransactions != null && !partition.abortedTransactions.isEmpty()) {
+                abortedTransactions = new PriorityQueue<>(
+                        partition.abortedTransactions.size(),
+                        new Comparator<FetchResponse.AbortedTransaction>() {
+                            @Override
+                            public int compare(FetchResponse.AbortedTransaction o1, FetchResponse.AbortedTransaction o2) {
+                                return Long.compare(o1.firstOffset, o2.firstOffset);
+                            }
+                        }
+                );
+                abortedTransactions.addAll(partition.abortedTransactions);
+            } else {
+                abortedTransactions = new PriorityQueue<>();
+            }
+            return abortedTransactions;
+        }
+
+        private boolean containsAbortMarker(RecordBatch batch) {
+            Iterator<Record> batchIterator = batch.iterator();
+            Record firstRecord = batchIterator.hasNext() ? batchIterator.next() : null;
+            boolean containsAbortMarker = firstRecord != null && firstRecord.isControlRecord() && ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
+            if (containsAbortMarker && batchIterator.hasNext())
+                throw new CorruptRecordException("A record batch containing a control message contained more than one record. partition: " + partition + ", offset: " + batch.baseOffset());
+            return containsAbortMarker;
+        }
     }
 
     private static class ExceptionMetadata {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 8fe99dd..3745aba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -20,9 +20,12 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.internals.FutureTransactionalResult;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.clients.producer.internals.RecordAccumulator;
 import org.apache.kafka.clients.producer.internals.Sender;
+import org.apache.kafka.clients.producer.internals.TransactionManager;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
@@ -32,6 +35,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.errors.TimeoutException;
@@ -159,7 +163,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
-    private final TransactionState transactionState;
+    private final TransactionManager transactionManager;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -263,10 +267,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
             this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
             this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
-            this.transactionState = configureTransactionState(config, time);
-            int retries = configureRetries(config, transactionState != null);
-            int maxInflightRequests = configureInflightRequests(config, transactionState != null);
-            short acks = configureAcks(config, transactionState != null);
+            this.transactionManager = configureTransactionState(config);
+            int retries = configureRetries(config, transactionManager != null);
+            int maxInflightRequests = configureInflightRequests(config, transactionManager != null);
+            short acks = configureAcks(config, transactionManager != null);
 
             this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
@@ -277,7 +281,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     metrics,
                     time,
                     apiVersions,
-                    transactionState);
+                    transactionManager);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@@ -305,7 +309,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     Time.SYSTEM,
                     this.requestTimeoutMs,
                     config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
-                    this.transactionState,
+                    this.transactionManager,
                     apiVersions);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
@@ -363,13 +367,37 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
-    private static TransactionState configureTransactionState(ProducerConfig config, Time time) {
+    private static TransactionManager configureTransactionState(ProducerConfig config) {
+
+        TransactionManager transactionManager = null;
+
+        boolean userConfiguredIdempotence = false;
+        if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
+            userConfiguredIdempotence = true;
+
+        boolean userConfiguredTransactions = false;
+        if (config.originals().containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG))
+            userConfiguredTransactions = true;
+
         boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+
+        if (!idempotenceEnabled && userConfiguredIdempotence && userConfiguredTransactions)
+            throw new ConfigException("Cannot set a " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " without also enabling idempotence.");
+
+        if (userConfiguredTransactions)
+            idempotenceEnabled = true;
+
         if (idempotenceEnabled) {
-            return new TransactionState(time);
-        } else {
-            return null;
+            String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
+            int transactionTimeoutMs = config.getInt(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG);
+            transactionManager = new TransactionManager(transactionalId, transactionTimeoutMs);
+            if (transactionManager.isTransactional())
+                log.info("Instantiated a transactional producer.");
+            else
+                log.info("Instantiated an idempotent producer.");
         }
+
+        return transactionManager;
     }
 
     private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled) {
@@ -431,6 +459,87 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     }
 
     /**
+     * Needs to be called before any other methods when the transactional.id is set in the configuration.
+     *
+     * This method does the following:
+     *   1. Ensures any transactions initiated by previous instances of the producer
+     *      are completed. If the previous instance had failed with a transaction in
+     *      progress, it will be aborted. If the last transaction had begun completion,
+     *      but not yet finished, this method awaits its completion.
+     *   2. Gets the internal producer id and epoch, used in all future transactional
+     *      messages issued by the producer.
+     *
+     * @throws IllegalStateException if the TransactionalId for the producer is not set
+     *         in the configuration.
+     */
+    public void initTransactions() {
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot call initTransactions without setting a transactional id.");
+        transactionManager.initializeTransactions().get();
+    }
+
+    /**
+     * Should be called before the start of each new transaction.
+     *
+     * @throws ProducerFencedException if another producer is with the same
+     *         transactional.id is active.
+     */
+    public void beginTransaction() throws ProducerFencedException {
+        // Set the transactional bit in the producer.
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot use transactional methods without enabling transactions");
+        transactionManager.beginTransaction();
+    }
+
+    /**
+     * Sends a list of consumed offsets to the consumer group coordinator, and also marks
+     * those offsets as part of the current transaction. These offsets will be considered
+     * consumed only if the transaction is committed successfully.
+     *
+     * This method should be used when you need to batch consumed and produced messages
+     * together, typically in a consume-transform-produce pattern.
+     *
+     * @throws ProducerFencedException if another producer with the same
+     *         transactional.id is active.
+     */
+    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
+                                         String consumerGroupId) throws ProducerFencedException {
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot send offsets to transaction since transactions are not enabled.");
+        FutureTransactionalResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
+        sender.wakeup();
+        result.get();
+    }
+
+    /**
+     * Commits the ongoing transaction.
+     *
+     * @throws ProducerFencedException if another producer with the same
+     *         transactional.id is active.
+     */
+    public void commitTransaction() throws ProducerFencedException {
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot commit transaction since transactions are not enabled");
+        FutureTransactionalResult result = transactionManager.beginCommittingTransaction();
+        sender.wakeup();
+        result.get();
+    }
+
+    /**
+     * Aborts the ongoing transaction.
+     *
+     * @throws ProducerFencedException if another producer with the same
+     *         transactional.id is active.
+     */
+    public void abortTransaction() throws ProducerFencedException {
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot abort transaction since transactions are not enabled.");
+        FutureTransactionalResult result = transactionManager.beginAbortingTransaction();
+        sender.wakeup();
+        result.get();
+    }
+
+    /**
      * Asynchronously send a record to a topic. Equivalent to <code>send(record, null)</code>.
      * See {@link #send(ProducerRecord, Callback)} for details.
      */
@@ -523,6 +632,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
+        ensureProperTransactionalState();
         TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
@@ -554,7 +664,11 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             long timestamp = record.timestamp() == null ? time.milliseconds() : record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
             // producer callback will make sure to call both 'callback' and interceptor callback
-            Callback interceptCallback = this.interceptors == null ? callback : new InterceptorCallback<>(callback, this.interceptors, tp);
+            Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp, transactionManager);
+
+            if (transactionManager != null)
+                transactionManager.maybeAddPartitionToTransaction(tp);
+
             RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                     serializedValue, interceptCallback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
@@ -597,6 +711,30 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         }
     }
 
+    private void ensureProperTransactionalState() {
+        if (transactionManager == null)
+            return;
+
+        if (transactionManager.isTransactional() && !transactionManager.hasPid())
+            throw new IllegalStateException("Cannot perform a 'send' before completing a call to initTransactions when transactions are enabled.");
+
+        if (transactionManager.isFenced())
+            throw new ProducerFencedException("The current producer has been fenced off by a another producer using the same transactional id.");
+
+        if (transactionManager.isInTransaction()) {
+            if (transactionManager.isInErrorState()) {
+                String errorMessage = "Cannot perform a transactional send because at least one previous transactional request has failed with errors.";
+                Exception lastError = transactionManager.lastError();
+                if (lastError != null)
+                    throw new KafkaException(errorMessage, lastError);
+                else
+                    throw new KafkaException(errorMessage);
+            }
+            if (transactionManager.isCompletingTransaction())
+                throw new IllegalStateException("Cannot call send while a commit or abort is in progress.");
+        }
+    }
+
     /**
      * Wait for cluster metadata including partitions for the given topic to be available.
      * @param topic The topic we want metadata for
@@ -890,12 +1028,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
         private final TopicPartition tp;
+        private final TransactionManager transactionManager;
 
         public InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors,
-                                   TopicPartition tp) {
+                                   TopicPartition tp, TransactionManager transactionManager) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
             this.tp = tp;
+            this.transactionManager = transactionManager;
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -909,6 +1049,9 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             }
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
+
+            if (exception != null && transactionManager != null)
+                transactionManager.maybeSetError(exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index ab09997..80ea372 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
@@ -24,6 +25,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.serialization.Serializer;
 
@@ -95,6 +97,27 @@ public class MockProducer<K, V> implements Producer<K, V> {
         this(Cluster.empty(), autoComplete, partitioner, keySerializer, valueSerializer);
     }
 
+    public void initTransactions() {
+
+    }
+
+    public void beginTransaction() throws ProducerFencedException {
+
+    }
+
+    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
+                                  String consumerGroupId) throws ProducerFencedException {
+
+    }
+
+    public void commitTransaction() throws ProducerFencedException {
+
+    }
+
+    public void abortTransaction() throws ProducerFencedException {
+
+    }
+
     /**
      * Adds the record to the list of sent records. The {@link RecordMetadata} returned will be immediately satisfied.
      * 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
index 4da8681..1e77633 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Producer.java
@@ -19,6 +19,9 @@ package org.apache.kafka.clients.producer;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 
 import java.io.Closeable;
 import java.util.List;
@@ -26,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-
 /**
  * The interface for the {@link KafkaProducer}
  * @see KafkaProducer
@@ -35,6 +37,32 @@ import java.util.concurrent.TimeUnit;
 public interface Producer<K, V> extends Closeable {
 
     /**
+     * See {@link KafkaProducer#initTransactions()}
+     */
+    void initTransactions();
+
+    /**
+     * See {@link KafkaProducer#beginTransaction()}
+     */
+    void beginTransaction() throws ProducerFencedException;
+
+    /**
+     * See {@link KafkaProducer#sendOffsetsToTransaction(Map, String)}
+     */
+    void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
+                                  String consumerGroupId) throws ProducerFencedException;
+
+    /**
+     * See {@link KafkaProducer#commitTransaction()}
+     */
+    void commitTransaction() throws ProducerFencedException;
+
+    /**
+     * See {@link KafkaProducer#abortTransaction()}
+     */
+    void abortTransaction() throws ProducerFencedException;
+
+    /**
      * Send the given record asynchronously and return a future which will eventually contain the response information.
      *
      * @param record The record to send

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index d6e03d2..4bceb95 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -232,6 +232,18 @@ public class ProducerConfig extends AbstractConfig {
                                                         + "<code>" + RETRIES_CONFIG + "</code> cannot be zero. Additionally " + ACKS_CONFIG + " must be set to 'all'. If these values "
                                                         + "are left at their defaults, we will override the default to be suitable. "
                                                         + "If the values are set to something incompatible with the idempotent producer, a ConfigException will be thrown.";
+
+    /** <code> transaction.timeout.ms </code> */
+    public static final String TRANSACTION_TIMEOUT_CONFIG = "transaction.timeout.ms";
+    public static final String TRANSACTION_TIMEOUT_DOC = "The maximum amount of time in ms that the transaction coordinator will wait for a transaction status update from the producer before proactively aborting the ongoing transaction." +
+            "If this value is larger than the max.transaction.timeout.ms setting in the broker, the request will fail with a `InvalidTransactionTimeout` error.";
+
+    /** <code> transactional.id </code> */
+    public static final String TRANSACTIONAL_ID_CONFIG = "transactional.id";
+    public static final String TRANSACTIONAL_ID_DOC = "The TransactionalId to use for transactional delivery. This enables reliability semantics which span multiple producer sessions since it allows the client to guarantee that transactions using the same TransactionalId have been completed prior to starting any new transactions. If no TransactionalId is provided, then the producer is limited to idempotent delivery. " +
+            "Note that enable.idempotence must be enabled if a TransactionalId is configured. " +
+            "The default is empty, which means transactions cannot be used.";
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -325,7 +337,18 @@ public class ProducerConfig extends AbstractConfig {
                                         Type.BOOLEAN,
                                         false,
                                         Importance.LOW,
-                                        ENABLE_IDEMPOTENCE_DOC);
+                                        ENABLE_IDEMPOTENCE_DOC)
+                                .define(TRANSACTION_TIMEOUT_CONFIG,
+                                        Type.INT,
+                                        60000,
+                                        Importance.LOW,
+                                        TRANSACTION_TIMEOUT_DOC)
+                                .define(TRANSACTIONAL_ID_CONFIG,
+                                        Type.STRING,
+                                        null,
+                                        new ConfigDef.NonEmptyString(),
+                                        Importance.LOW,
+                                        TRANSACTIONAL_ID_DOC);
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
deleted file mode 100644
index fa30b3f..0000000
--- a/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.producer;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Time;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
-import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
-
-/**
- * A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
- */
-public class TransactionState {
-    private volatile PidAndEpoch pidAndEpoch;
-    private final Map<TopicPartition, Integer> sequenceNumbers;
-    private final Time time;
-
-    public static class PidAndEpoch {
-        public final long producerId;
-        public final short epoch;
-
-        PidAndEpoch(long producerId, short epoch) {
-            this.producerId = producerId;
-            this.epoch = epoch;
-        }
-
-        public boolean isValid() {
-            return NO_PRODUCER_ID < producerId;
-        }
-    }
-
-    public TransactionState(Time time) {
-        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        this.sequenceNumbers = new HashMap<>();
-        this.time = time;
-    }
-
-    public boolean hasPid() {
-        return pidAndEpoch.isValid();
-    }
-
-    /**
-     * A blocking call to get the pid and epoch for the producer. If the PID and epoch has not been set, this method
-     * will block for at most maxWaitTimeMs. It is expected that this method be called from application thread
-     * contexts (ie. through Producer.send). The PID it self will be retrieved in the background thread.
-     * @param maxWaitTimeMs The maximum time to block.
-     * @return a PidAndEpoch object. Callers must call the 'isValid' method fo the returned object to ensure that a
-     *         valid Pid and epoch is actually returned.
-     */
-    public synchronized PidAndEpoch awaitPidAndEpoch(long maxWaitTimeMs) throws InterruptedException {
-        long start = time.milliseconds();
-        long elapsed = 0;
-        while (!hasPid() && elapsed < maxWaitTimeMs) {
-            wait(maxWaitTimeMs);
-            elapsed = time.milliseconds() - start;
-        }
-        return pidAndEpoch;
-    }
-
-    /**
-     * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
-     * verify that the result is valid.
-     *
-     * @return the current PidAndEpoch.
-     */
-    public PidAndEpoch pidAndEpoch() {
-        return pidAndEpoch;
-    }
-
-    /**
-     * Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method
-     * once the pid is set. This method will be called on the background thread when the broker responds with the pid.
-     */
-    public synchronized void setPidAndEpoch(long pid, short epoch) {
-        this.pidAndEpoch = new PidAndEpoch(pid, epoch);
-        if (this.pidAndEpoch.isValid())
-            notifyAll();
-    }
-
-    /**
-     * This method is used when the producer needs to reset it's internal state because of an irrecoverable exception
-     * from the broker.
-     *
-     * We need to reset the producer id and associated state when we have sent a batch to the broker, but we either get
-     * a non-retriable exception or we run out of retries, or the batch expired in the producer queue after it was already
-     * sent to the broker.
-     *
-     * In all of these cases, we don't know whether batch was actually committed on the broker, and hence whether the
-     * sequence number was actually updated. If we don't reset the producer state, we risk the chance that all future
-     * messages will return an OutOfOrderSequenceException.
-     */
-    public synchronized void resetProducerId() {
-        setPidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
-        this.sequenceNumbers.clear();
-    }
-
-    /**
-     * Returns the next sequence number to be written to the given TopicPartition.
-     */
-    public synchronized Integer sequenceNumber(TopicPartition topicPartition) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
-        if (currentSequenceNumber == null) {
-            currentSequenceNumber = 0;
-            sequenceNumbers.put(topicPartition, currentSequenceNumber);
-        }
-        return currentSequenceNumber;
-    }
-
-    public synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
-        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
-        if (currentSequenceNumber == null)
-            throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
-
-        currentSequenceNumber += increment;
-        sequenceNumbers.put(topicPartition, currentSequenceNumber);
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
new file mode 100644
index 0000000..d05bc6a
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureTransactionalResult.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+public final class FutureTransactionalResult implements Future<TransactionalRequestResult> {
+
+    private final TransactionalRequestResult result;
+
+    public FutureTransactionalResult(TransactionalRequestResult result) {
+        this.result = result;
+    }
+
+    @Override
+    public boolean isDone() {
+        return this.result.isCompleted();
+    }
+
+    @Override
+    public boolean isCancelled() {
+        return false;
+    }
+
+    @Override
+    public boolean cancel(boolean mayInterruptIfRunning) {
+        return false;
+    }
+
+    @Override
+    public TransactionalRequestResult get() {
+        this.result.await();
+        if (!result.isSuccessful()) {
+            throw result.error();
+        }
+        return result;
+    }
+
+    @Override
+    public TransactionalRequestResult get(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
+        boolean occurred = this.result.await(timeout, unit);
+        if (!occurred) {
+            throw new TimeoutException("Could not complete transactional operation within " + TimeUnit.MILLISECONDS.convert(timeout, unit) + "ms.");
+        }
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index bacf0a2..eba3078 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.AbstractRecords;
@@ -231,7 +230,7 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
-    public void setProducerState(TransactionState.PidAndEpoch pidAndEpoch, int baseSequence) {
+    public void setProducerState(TransactionManager.PidAndEpoch pidAndEpoch, int baseSequence) {
         recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
     }
 
@@ -273,4 +272,8 @@ public final class ProducerBatch {
     public long producerId() {
         return recordsBuilder.producerId();
     }
+
+    public short producerEpoch() {
+        return recordsBuilder.producerEpoch();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index be03142..aa4d9d3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -18,7 +18,6 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -82,7 +81,7 @@ public final class RecordAccumulator {
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
     private final Set<TopicPartition> muted;
     private int drainIndex;
-    private final TransactionState transactionState;
+    private final TransactionManager transactionManager;
 
     /**
      * Create a new record accumulator
@@ -98,7 +97,7 @@ public final class RecordAccumulator {
      * @param metrics The metrics
      * @param time The time instance to use
      * @param apiVersions Request API versions for current connected brokers
-     * @param transactionState The shared transaction state object which tracks Pids, epochs, and sequence numbers per
+     * @param transactionManager The shared transaction state object which tracks Pids, epochs, and sequence numbers per
      *                         partition.
      */
     public RecordAccumulator(int batchSize,
@@ -109,7 +108,7 @@ public final class RecordAccumulator {
                              Metrics metrics,
                              Time time,
                              ApiVersions apiVersions,
-                             TransactionState transactionState) {
+                             TransactionManager transactionManager) {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -125,7 +124,7 @@ public final class RecordAccumulator {
         this.muted = new HashSet<>();
         this.time = time;
         this.apiVersions = apiVersions;
-        this.transactionState = transactionState;
+        this.transactionManager = transactionManager;
         registerMetrics(metrics, metricGrpName);
     }
 
@@ -229,11 +228,14 @@ public final class RecordAccumulator {
     }
 
     private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
-        if (transactionState != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
+        if (transactionManager != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
             throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
                     "support the required message format (v2). The broker must be version 0.11 or later.");
         }
-        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L);
+        boolean isTransactional = false;
+        if (transactionManager != null)
+            isTransactional = transactionManager.isInTransaction();
+        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, 0L, isTransactional);
     }
 
     /**
@@ -437,9 +439,9 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
-                                        TransactionState.PidAndEpoch pidAndEpoch = null;
-                                        if (transactionState != null) {
-                                            pidAndEpoch = transactionState.pidAndEpoch();
+                                        TransactionManager.PidAndEpoch pidAndEpoch = null;
+                                        if (transactionManager != null) {
+                                            pidAndEpoch = transactionManager.pidAndEpoch();
                                             if (!pidAndEpoch.isValid())
                                                 // we cannot send the batch until we have refreshed the PID
                                                 break;
@@ -452,7 +454,7 @@ public final class RecordAccumulator {
                                             // the previous attempt may actually have been accepted, and if we change
                                             // the pid and sequence here, this attempt will also be accepted, causing
                                             // a duplicate.
-                                            int sequenceNumber = transactionState.sequenceNumber(batch.topicPartition);
+                                            int sequenceNumber = transactionManager.sequenceNumber(batch.topicPartition);
                                             log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
                                                     node, pidAndEpoch.producerId, pidAndEpoch.epoch,
                                                     batch.topicPartition, sequenceNumber);
@@ -542,6 +544,14 @@ public final class RecordAccumulator {
         }
     }
 
+    public boolean hasUnflushedBatches() {
+        for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches().entrySet()) {
+            if (!entry.getValue().isEmpty())
+                return true;
+        }
+        return !this.incomplete.incomplete.isEmpty();
+    }
+
     /**
      * This function is only called when sender is closed forcefully. It will fail all the
      * incomplete batches and return.

http://git-wip-us.apache.org/repos/asf/kafka/blob/a82f194b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index ab92522..698f4de 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
-import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
@@ -109,7 +109,7 @@ public class Sender implements Runnable {
     private final ApiVersions apiVersions;
 
     /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */
-    private final TransactionState transactionState;
+    private final TransactionManager transactionManager;
 
     public Sender(KafkaClient client,
                   Metadata metadata,
@@ -122,7 +122,7 @@ public class Sender implements Runnable {
                   Time time,
                   int requestTimeout,
                   long retryBackoffMs,
-                  TransactionState transactionState,
+                  TransactionManager transactionManager,
                   ApiVersions apiVersions) {
         this.client = client;
         this.accumulator = accumulator;
@@ -137,7 +137,7 @@ public class Sender implements Runnable {
         this.requestTimeout = requestTimeout;
         this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
-        this.transactionState = transactionState;
+        this.transactionManager = transactionManager;
     }
 
     /**
@@ -187,8 +187,16 @@ public class Sender implements Runnable {
      * @param now The current POSIX time in milliseconds
      */
     void run(long now) {
-        Cluster cluster = metadata.fetch();
+        long pollTimeout = 0;
+        if (!maybeSendTransactionalRequest(now))
+            pollTimeout = sendProducerData(now);
+
+        this.client.poll(pollTimeout, now);
+    }
+
 
+    private long sendProducerData(long now) {
+        Cluster cluster = metadata.fetch();
         maybeWaitForPid();
 
         // get the list of partitions with data ready to send
@@ -233,15 +241,15 @@ public class Sender implements Runnable {
         // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
         // we need to reset the producer id here.
         for (ProducerBatch expiredBatch : expiredBatches) {
-            if (transactionState != null && expiredBatch.inRetry()) {
+            if (transactionManager != null && expiredBatch.inRetry()) {
                 needsTransactionStateReset = true;
             }
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
         }
 
         if (needsTransactionStateReset) {
-            transactionState.resetProducerId();
-            return;
+            transactionManager.resetProducerId();
+            return 0;
         }
 
         sensors.updateProduceRequestMetrics(batches);
@@ -253,15 +261,79 @@ public class Sender implements Runnable {
         long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
         if (!result.readyNodes.isEmpty()) {
             log.trace("Nodes with data ready to send: {}", result.readyNodes);
+            // if some partitions are already ready to be sent, the select time would be 0;
+            // otherwise if some partition already has some data accumulated but not ready yet,
+            // the select time will be the time difference between now and its linger expiry time;
+            // otherwise the select time will be the time difference between now and the metadata expiry time;
             pollTimeout = 0;
         }
         sendProduceRequests(batches, now);
 
-        // if some partitions are already ready to be sent, the select time would be 0;
-        // otherwise if some partition already has some data accumulated but not ready yet,
-        // the select time will be the time difference between now and its linger expiry time;
-        // otherwise the select time will be the time difference between now and the metadata expiry time;
-        this.client.poll(pollTimeout, now);
+        return pollTimeout;
+
+    }
+
+    private boolean maybeSendTransactionalRequest(long now) {
+        if (transactionManager != null && transactionManager.hasInflightTransactionalRequest())
+            return true;
+
+        if (transactionManager == null || !transactionManager.hasPendingTransactionalRequests())
+            return false;
+
+        TransactionManager.TransactionalRequest nextRequest = transactionManager.nextTransactionalRequest();
+
+        if (nextRequest.isEndTxnRequest() && transactionManager.isCompletingTransaction() && accumulator.hasUnflushedBatches()) {
+            if (!accumulator.flushInProgress())
+                accumulator.beginFlush();
+            transactionManager.reenqueue(nextRequest);
+            return false;
+        }
+
+        if (nextRequest.isEndTxnRequest() && transactionManager.isInErrorState()) {
+            nextRequest.maybeTerminateWithError(new KafkaException("Cannot commit transaction when there are " +
+                    "request errors. Please check your logs for the details of the errors encountered."));
+            return false;
+        }
+
+        Node targetNode = null;
+
+        while (targetNode == null) {
+            try {
+                if (nextRequest.needsCoordinator()) {
+                    targetNode = transactionManager.coordinator(nextRequest.coordinatorType());
+                    if (targetNode == null) {
+                        transactionManager.needsCoordinator(nextRequest);
+                        break;
+                    }
+                    if (!NetworkClientUtils.awaitReady(client, targetNode, time, requestTimeout)) {
+                        transactionManager.needsCoordinator(nextRequest);
+                        targetNode = null;
+                        break;
+                    }
+                } else {
+                    targetNode = awaitLeastLoadedNodeReady(requestTimeout);
+                }
+                if (targetNode != null) {
+                    if (nextRequest.isRetry()) {
+                        time.sleep(retryBackoffMs);
+                    }
+                    ClientRequest clientRequest = client.newClientRequest(targetNode.idString(), nextRequest.requestBuilder(),
+                            now, true, nextRequest.responseHandler());
+                    transactionManager.setInFlightRequestCorrelationId(clientRequest.correlationId());
+                    client.send(clientRequest, now);
+                    return true;
+                }
+            } catch (IOException e) {
+                log.warn("Got an exception when trying to find a node to send a transactional request to. Going to back off and retry", e);
+            }
+            time.sleep(retryBackoffMs);
+            metadata.requestUpdate();
+        }
+
+        if (targetNode == null)
+            transactionManager.needsRetry(nextRequest);
+
+        return true;
     }
 
     /**
@@ -299,17 +371,19 @@ public class Sender implements Runnable {
     }
 
     private void maybeWaitForPid() {
-        if (transactionState == null)
+        // If this is a transactional producer, the PID will be received when recovering transactions in the
+        // initTransactions() method of the producer.
+        if (transactionManager == null || transactionManager.isTransactional())
             return;
 
-        while (!transactionState.hasPid()) {
+        while (!transactionManager.hasPid()) {
             try {
                 Node node = awaitLeastLoadedNodeReady(requestTimeout);
                 if (node != null) {
                     ClientResponse response = sendAndAwaitInitPidRequest(node);
                     if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
                         InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
-                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                        transactionManager.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
                     } else {
                         log.error("Received an unexpected response type for an InitPidRequest from {}. " +
                                 "We will back off and try again.", node);
@@ -381,17 +455,17 @@ public class Sender implements Runnable {
                         batch.topicPartition,
                         this.retries - batch.attempts() - 1,
                         error);
-                if (transactionState == null) {
+                if (transactionManager == null) {
                     reenqueueBatch(batch, now);
-                } else if (transactionState.pidAndEpoch().producerId == batch.producerId()) {
+                } else if (transactionManager.pidAndEpoch().producerId == batch.producerId() && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
                     // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch.
                     log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
-                            transactionState.sequenceNumber(batch.topicPartition));
+                            transactionManager.sequenceNumber(batch.topicPartition));
                     reenqueueBatch(batch, now);
                 } else {
                     failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                             "batch but the producer id changed from " + batch.producerId() + " to " +
-                            transactionState.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
+                            transactionManager.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
                     this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
                 }
             } else {
@@ -400,7 +474,7 @@ public class Sender implements Runnable {
                     exception = new TopicAuthorizationException(batch.topicPartition.topic());
                 else
                     exception = error.exception();
-                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionState.pidAndEpoch().producerId)
+                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionManager.pidAndEpoch().producerId)
                     log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
                                     "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
                             correlationId, batch.topicPartition, response.baseOffset);
@@ -418,10 +492,11 @@ public class Sender implements Runnable {
         } else {
             completeBatch(batch, response);
 
-            if (transactionState != null && transactionState.pidAndEpoch().producerId == batch.producerId()) {
-                transactionState.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+            if (transactionManager != null && transactionManager.pidAndEpoch().producerId == batch.producerId()
+                    && transactionManager.pidAndEpoch().epoch == batch.producerEpoch()) {
+                transactionManager.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
                 log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
-                        transactionState.sequenceNumber(batch.topicPartition));
+                        transactionManager.sequenceNumber(batch.topicPartition));
             }
         }
 
@@ -441,11 +516,12 @@ public class Sender implements Runnable {
     }
 
     private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
-        if (transactionState != null && batch.producerId() == transactionState.pidAndEpoch().producerId) {
+        if (transactionManager != null && !transactionManager.isTransactional()
+                && batch.producerId() == transactionManager.pidAndEpoch().producerId) {
             // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
             // about the previously committed message. Note that this will discard the producer id and sequence
             // numbers for all existing partitions.
-            transactionState.resetProducerId();
+            transactionManager.resetProducerId();
         }
         batch.done(response.baseOffset, response.logAppendTime, exception);
         this.accumulator.deallocate(batch);
@@ -500,8 +576,12 @@ public class Sender implements Runnable {
             recordsByPartition.put(tp, batch);
         }
 
+        String transactionalId = null;
+        if (transactionManager != null && transactionManager.isTransactional()) {
+            transactionalId = transactionManager.transactionalId();
+        }
         ProduceRequest.Builder requestBuilder = new ProduceRequest.Builder(minUsedMagic, acks, timeout,
-                produceRecordsByPartition);
+                produceRecordsByPartition, transactionalId);
         RequestCompletionHandler callback = new RequestCompletionHandler() {
             public void onComplete(ClientResponse response) {
                 handleProduceResponse(response, recordsByPartition, time.milliseconds());