You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 02:41:50 UTC

[4/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

KAFKA-4817; Add idempotent producer semantics

This is from the KIP-98 proposal.

The main points of discussion surround the correctness logic, particularly the Log class where incoming entries are validated and duplicates are dropped, and also the producer error handling to ensure that the semantics are sound from the users point of view.

There is some subtlety in the idempotent producer semantics. This patch only guarantees idempotent production upto the point where an error has to be returned to the user. Once we hit a such a non-recoverable error, we can no longer guarantee message ordering nor idempotence without additional logic at the application level.

In particular, if an application wants guaranteed message order without duplicates, then it needs to do the following in the error callback:

1. Close the producer so that no queued batches are sent. This is important for guaranteeing ordering.
2. Read the tail of the log to inspect the last message committed. This is important for avoiding duplicates.

Author: Apurva Mehta <ap...@confluent.io>
Author: hachikuji <ja...@confluent.io>
Author: Apurva Mehta <ap...@gmail.com>
Author: Guozhang Wang <wa...@gmail.com>
Author: fpj <fp...@apache.org>
Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2735 from apurvam/exactly-once-idempotent-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/bdf4cba0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/bdf4cba0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/bdf4cba0

Branch: refs/heads/trunk
Commit: bdf4cba047334943aa8357585c4fb379b27e9ffd
Parents: 1ce6aa5
Author: Apurva Mehta <ap...@confluent.io>
Authored: Sun Apr 2 19:41:44 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Apr 2 19:41:44 2017 -0700

----------------------------------------------------------------------
 checkstyle/checkstyle.xml                       |   4 +-
 checkstyle/suppressions.xml                     |   3 +
 .../kafka/clients/NetworkClientUtils.java       | 103 +++++
 .../kafka/clients/producer/KafkaProducer.java   | 184 ++++++---
 .../kafka/clients/producer/MockProducer.java    |   1 -
 .../kafka/clients/producer/ProducerConfig.java  |  20 +-
 .../clients/producer/TransactionState.java      | 135 +++++++
 .../producer/internals/ProducerBatch.java       |  19 +-
 .../producer/internals/RecordAccumulator.java   |  53 ++-
 .../clients/producer/internals/Sender.java      | 180 +++++++--
 .../DuplicateSequenceNumberException.java       |  24 ++
 .../errors/OutOfOrderSequenceException.java     |  24 ++
 .../common/errors/ProducerFencedException.java  |  24 ++
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Errors.java    |  12 +-
 .../apache/kafka/common/protocol/Protocol.java  |  25 ++
 .../kafka/common/protocol/types/Struct.java     |   4 +
 .../kafka/common/protocol/types/Type.java       |  30 ++
 .../record/AbstractLegacyRecordBatch.java       |   5 +
 .../common/record/AbstractRecordBatch.java      |   5 +
 .../kafka/common/record/MemoryRecords.java      |  20 +-
 .../common/record/MemoryRecordsBuilder.java     |  26 +-
 .../apache/kafka/common/record/RecordBatch.java |   5 +
 .../kafka/common/requests/AbstractRequest.java  |   3 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../kafka/common/requests/InitPidRequest.java   |  81 ++++
 .../kafka/common/requests/InitPidResponse.java  |  80 ++++
 .../apache/kafka/common/utils/ByteUtils.java    |  10 +
 .../internals/RecordAccumulatorTest.java        |  44 ++-
 .../clients/producer/internals/SenderTest.java  | 250 ++++++++++--
 .../internals/TransactionStateTest.java         |  61 +++
 .../kafka/common/record/MemoryRecordsTest.java  |   2 +-
 .../common/requests/RequestResponseTest.java    |  11 +
 .../kafka/common/utils/ByteUtilsTest.java       |  10 +
 .../controller/ControllerChannelManager.scala   |  12 +-
 .../coordinator/GroupMetadataManager.scala      |  13 +-
 .../scala/kafka/coordinator/PidMetadata.scala   |  31 ++
 .../kafka/coordinator/ProducerIdManager.scala   | 153 +++++++
 .../coordinator/TransactionCoordinator.scala    |  92 +++++
 core/src/main/scala/kafka/log/Log.scala         | 157 ++++++--
 core/src/main/scala/kafka/log/LogCleaner.scala  |  19 +-
 core/src/main/scala/kafka/log/LogConfig.scala   |   1 +
 core/src/main/scala/kafka/log/LogManager.scala  |  64 ++-
 .../src/main/scala/kafka/log/LogValidator.scala |  16 +-
 .../scala/kafka/log/ProducerIdMapping.scala     | 394 +++++++++++++++++++
 .../src/main/scala/kafka/server/KafkaApis.scala |  52 ++-
 .../main/scala/kafka/server/KafkaConfig.scala   |  21 +-
 .../main/scala/kafka/server/KafkaServer.scala   |  54 +--
 .../kafka/server/ReplicaFetcherThread.scala     |   7 +-
 .../kafka/utils/NetworkClientBlockingOps.scala  | 145 -------
 core/src/main/scala/kafka/utils/ZkUtils.scala   |  36 +-
 .../kafka/api/ProducerBounceTest.scala          |  85 ++--
 .../GroupCoordinatorResponseTest.scala          |   7 +-
 .../coordinator/GroupMetadataManagerTest.scala  |   5 +-
 .../coordinator/ProducerIdManagerTest.scala     | 105 +++++
 .../TransactionCoordinatorTest.scala            |  93 +++++
 .../unit/kafka/log/BrokerCompressionTest.scala  |   2 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   2 +-
 .../log/LogCleanerLagIntegrationTest.scala      |   2 +-
 .../unit/kafka/log/LogCleanerManagerTest.scala  |   4 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   |  29 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 323 ++++++++++++---
 .../unit/kafka/log/ProducerIdMappingTest.scala  | 224 +++++++++++
 .../kafka/server/ReplicationQuotasTest.scala    |   1 +
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 .../scala/unit/kafka/utils/ZkUtilsTest.scala    |  15 +
 tests/kafkatest/services/verifiable_producer.py |  16 +-
 tests/kafkatest/tests/core/replication_test.py  |  24 +-
 .../kafkatest/tests/produce_consume_validate.py |  13 +-
 70 files changed, 3124 insertions(+), 563 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/checkstyle/checkstyle.xml
----------------------------------------------------------------------
diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml
index b775025..6a263cc 100644
--- a/checkstyle/checkstyle.xml
+++ b/checkstyle/checkstyle.xml
@@ -98,7 +98,7 @@
     <module name="MethodLength"/>
     <module name="ParameterNumber">
       <!-- default is 8 -->
-      <property name="max" value="12"/>
+      <property name="max" value="13"/>
     </module>
     <module name="ClassDataAbstractionCoupling">
       <!-- default is 7 -->
@@ -115,7 +115,7 @@
     </module>
     <module name="CyclomaticComplexity">
       <!-- default is 10-->
-      <property name="max" value="15"/>
+      <property name="max" value="16"/>
     </module>
     <module name="JavaNCSS">
       <!-- default is 50 -->

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ea1619e..7bc55c8 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -88,6 +88,9 @@
     <suppress checks="ClassFanOutComplexity"
               files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher)Test.java"/>
 
+    <suppress checks="JavaNCSS"
+              files="RequestResponseTest.java"/>
+
     <!-- Connect -->
     <suppress checks="ClassFanOutComplexity"
               files="DistributedHerder.java"/>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
new file mode 100644
index 0000000..8462979
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClientUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.utils.Time;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Provides additional utilities for {@link NetworkClient} (e.g. to implement blocking behaviour).
+ */
+public class NetworkClientUtils {
+
+    /**
+     * Checks whether the node is currently connected, first calling `client.poll` to ensure that any pending
+     * disconnects have been processed.
+     *
+     * This method can be used to check the status of a connection prior to calling the blocking version to be able
+     * to tell whether the latter completed a new connection.
+     */
+    public static boolean isReady(KafkaClient client, Node node, long currentTime) {
+        client.poll(0, currentTime);
+        return client.isReady(node, currentTime);
+    }
+
+    /**
+     * Invokes `client.poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
+     * invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.
+     *
+     * It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
+     * an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
+     * connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
+     * has recently disconnected.
+     *
+     * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+     * care.
+     */
+    public static boolean awaitReady(KafkaClient client, Node node, Time time, long timeoutMs) throws IOException {
+        if (timeoutMs < 0) {
+            throw new IllegalArgumentException("Timeout needs to be greater than 0");
+        }
+        long startTime = time.milliseconds();
+        long expiryTime = startTime + timeoutMs;
+
+        if (isReady(client, node, startTime) ||  client.ready(node, startTime))
+            return true;
+
+        long attemptStartTime = time.milliseconds();
+        while (!client.isReady(node, attemptStartTime) && attemptStartTime < expiryTime) {
+            if (client.connectionFailed(node)) {
+                throw new IOException("Connection to " + node + " failed.");
+            }
+            long pollTimeout = expiryTime - attemptStartTime;
+            client.poll(pollTimeout, attemptStartTime);
+            attemptStartTime = time.milliseconds();
+        }
+        return client.isReady(node, attemptStartTime);
+    }
+
+    /**
+     * Invokes `client.send` followed by 1 or more `client.poll` invocations until a response is received or a
+     * disconnection happens (which can happen for a number of reasons including a request timeout).
+     *
+     * In case of a disconnection, an `IOException` is thrown.
+     *
+     * This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
+     * care.
+     */
+    public static ClientResponse sendAndReceive(KafkaClient client, ClientRequest request, Time time) throws IOException {
+        client.send(request, time.milliseconds());
+        while (true) {
+            List<ClientResponse> responses = client.poll(Long.MAX_VALUE, time.milliseconds());
+            for (ClientResponse response : responses) {
+                if (response.requestHeader().correlationId() == request.correlationId()) {
+                    if (response.wasDisconnected()) {
+                        throw new IOException("Connection to " + response.destination() + " was disconnected before the response was read");
+                    }
+                    if (response.versionMismatch() != null) {
+                        throw response.versionMismatch();
+                    }
+                    return response;
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 78dd668..9342791 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -159,6 +159,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private final int requestTimeoutMs;
     private final ProducerInterceptors<K, V> interceptors;
     private final ApiVersions apiVersions;
+    private final TransactionState transactionState;
 
     /**
      * A producer is instantiated by providing a set of key-value pairs as configuration. Valid configuration strings
@@ -185,7 +186,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-             keySerializer, valueSerializer);
+                keySerializer, valueSerializer);
     }
 
     /**
@@ -208,7 +209,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
-             keySerializer, valueSerializer);
+                keySerializer, valueSerializer);
     }
 
     @SuppressWarnings({"unchecked", "deprecation"})
@@ -218,7 +219,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
             this.time = Time.SYSTEM;
-
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
@@ -255,48 +255,18 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             List<ProducerInterceptor<K, V>> interceptorList = (List) (new ProducerConfig(userProvidedConfigs, false)).getConfiguredInstances(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
                     ProducerInterceptor.class);
             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
-
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
             this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG), true, clusterResourceListeners);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
-            /* check for user defined settings.
-             * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
-             * This should be removed with release 0.9 when the deprecated configs are removed.
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
-                log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
-                if (blockOnBufferFull) {
-                    this.maxBlockTimeMs = Long.MAX_VALUE;
-                } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                    log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                            "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-                } else {
-                    this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                }
-            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
-                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
-            } else {
-                this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
-            }
 
-            /* check for user defined settings.
-             * If the TIME_OUT config is set use that for request timeout.
-             * This should be removed with release 0.9
-             */
-            if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
-                log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
-                        ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-                this.requestTimeoutMs = config.getInt(ProducerConfig.TIMEOUT_CONFIG);
-            } else {
-                this.requestTimeoutMs = config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
-            }
+            this.maxBlockTimeMs = configureMaxBlockTime(config, userProvidedConfigs);
+            this.requestTimeoutMs = configureRequestTimeout(config, userProvidedConfigs);
+            this.transactionState = configureTransactionState(config, time);
+            int retries = configureRetries(config, transactionState != null);
+            int maxInflightRequests = configureInflightRequests(config, transactionState != null);
+            short acks = configureAcks(config, transactionState != null);
 
             this.apiVersions = new ApiVersions();
             this.accumulator = new RecordAccumulator(config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
@@ -306,8 +276,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     retryBackoffMs,
                     metrics,
                     time,
-                    apiVersions);
-
+                    apiVersions,
+                    transactionState);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
@@ -316,7 +286,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                             this.metrics, time, "producer", channelBuilder),
                     this.metadata,
                     clientId,
-                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION),
+                    maxInflightRequests,
                     config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
                     config.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
@@ -327,33 +297,131 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             this.sender = new Sender(client,
                     this.metadata,
                     this.accumulator,
-                    config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) == 1,
+                    maxInflightRequests == 1,
                     config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
-                    (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG)),
-                    config.getInt(ProducerConfig.RETRIES_CONFIG),
+                    acks,
+                    retries,
                     this.metrics,
                     Time.SYSTEM,
                     this.requestTimeoutMs,
+                    config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
+                    this.transactionState,
                     apiVersions);
             String ioThreadName = "kafka-producer-network-thread" + (clientId.length() > 0 ? " | " + clientId : "");
             this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
             this.ioThread.start();
-
             this.errors = this.metrics.sensor("errors");
-
-
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId);
             log.debug("Kafka producer started");
         } catch (Throwable t) {
-            // call close methods if internal objects are already constructed
-            // this is to prevent resource leak. see KAFKA-2121
+            // call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
             close(0, TimeUnit.MILLISECONDS, true);
             // now propagate the exception
             throw new KafkaException("Failed to construct kafka producer", t);
         }
     }
 
+    private static long configureMaxBlockTime(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
+        /* check for user defined settings.
+         * If the BLOCK_ON_BUFFER_FULL is set to true,we do not honor METADATA_FETCH_TIMEOUT_CONFIG.
+         * This should be removed with release 0.9 when the deprecated configs are removed.
+         */
+        if (userProvidedConfigs.containsKey(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG)) {
+            log.warn(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG + " config is deprecated and will be removed soon. " +
+                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            boolean blockOnBufferFull = config.getBoolean(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG);
+            if (blockOnBufferFull) {
+                return Long.MAX_VALUE;
+            } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+                log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                        "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+                return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+            } else {
+                return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            }
+        } else if (userProvidedConfigs.containsKey(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG)) {
+            log.warn(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG + " config is deprecated and will be removed soon. " +
+                    "Please use " + ProducerConfig.MAX_BLOCK_MS_CONFIG);
+            return config.getLong(ProducerConfig.METADATA_FETCH_TIMEOUT_CONFIG);
+        } else {
+            return config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
+        }
+    }
+
+    private static int configureRequestTimeout(ProducerConfig config, Map<String, Object> userProvidedConfigs) {
+        /* check for user defined settings.
+         * If the TIME_OUT config is set use that for request timeout.
+         * This should be removed with release 0.9
+         */
+        if (userProvidedConfigs.containsKey(ProducerConfig.TIMEOUT_CONFIG)) {
+            log.warn(ProducerConfig.TIMEOUT_CONFIG + " config is deprecated and will be removed soon. Please use " +
+                    ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+            return config.getInt(ProducerConfig.TIMEOUT_CONFIG);
+        } else {
+            return config.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        }
+    }
+
+    private static TransactionState configureTransactionState(ProducerConfig config, Time time) {
+        boolean idempotenceEnabled = config.getBoolean(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG);
+        if (idempotenceEnabled) {
+            return new TransactionState(time);
+        } else {
+            return null;
+        }
+    }
+
+    private static int configureRetries(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredRetries = false;
+        if (config.originals().containsKey(ProducerConfig.RETRIES_CONFIG)) {
+            userConfiguredRetries = true;
+        }
+        if (idempotenceEnabled && !userConfiguredRetries) {
+            log.info("Overriding the default retries config to " + 3 + " since the idempotent producer is enabled.");
+            return 3;
+        }
+        if (idempotenceEnabled && config.getInt(ProducerConfig.RETRIES_CONFIG) == 0) {
+            throw new ConfigException("Must set " + ProducerConfig.RETRIES_CONFIG + " to non-zero when using the idempotent producer.");
+        }
+        return config.getInt(ProducerConfig.RETRIES_CONFIG);
+    }
+
+    private static int configureInflightRequests(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredInflights = false;
+        if (config.originals().containsKey(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION)) {
+            userConfiguredInflights = true;
+        }
+        if (idempotenceEnabled && !userConfiguredInflights) {
+            log.info("Overriding the default " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 since idempontence is enabled.");
+            return 1;
+        }
+        if (idempotenceEnabled && config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION) != 1) {
+            throw new ConfigException("Must set " + ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + " to 1 in order" +
+                    "to use the idempotent producer. Otherwise we cannot guarantee idempotence.");
+        }
+        return config.getInt(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION);
+    }
+
+    private static short configureAcks(ProducerConfig config, boolean idempotenceEnabled) {
+        boolean userConfiguredAcks = false;
+        short acks = (short) parseAcks(config.getString(ProducerConfig.ACKS_CONFIG));
+        if (config.originals().containsKey(ProducerConfig.ACKS_CONFIG)) {
+            userConfiguredAcks = true;
+        }
+
+        if (idempotenceEnabled && !userConfiguredAcks) {
+            log.info("Overriding the default " + ProducerConfig.ACKS_CONFIG + " to all since idempotence is enabled");
+            return -1;
+        }
+
+        if (idempotenceEnabled && acks != -1) {
+            throw new ConfigException("Must set " + ProducerConfig.ACKS_CONFIG + " to all in order to use the idempotent " +
+                    "producer. Otherwise we cannot guarantee idempotence");
+        }
+        return acks;
+    }
+
     private static int parseAcks(String acksString) {
         try {
             return acksString.trim().equalsIgnoreCase("all") ? -1 : Integer.parseInt(acksString.trim());
@@ -587,14 +655,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private void ensureValidRecordSize(int size) {
         if (size > this.maxRequestSize)
             throw new RecordTooLargeException("The message is " + size +
-                                              " bytes when serialized which is larger than the maximum request size you have configured with the " +
-                                              ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
-                                              " configuration.");
+                    " bytes when serialized which is larger than the maximum request size you have configured with the " +
+                    ProducerConfig.MAX_REQUEST_SIZE_CONFIG +
+                    " configuration.");
         if (size > this.totalMemorySize)
             throw new RecordTooLargeException("The message is " + size +
-                                              " bytes when serialized which is larger than the total memory buffer you have configured with the " +
-                                              ProducerConfig.BUFFER_MEMORY_CONFIG +
-                                              " configuration.");
+                    " bytes when serialized which is larger than the total memory buffer you have configured with the " +
+                    ProducerConfig.BUFFER_MEMORY_CONFIG +
+                    " configuration.");
     }
 
     /**
@@ -706,7 +774,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         if (timeout > 0) {
             if (invokedFromCallback) {
                 log.warn("Overriding close timeout {} ms to 0 ms in order to prevent useless blocking due to self-join. " +
-                    "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
+                        "This means you have incorrectly invoked close with a non-zero timeout from the producer call-back.", timeout);
             } else {
                 // Try to close gracefully.
                 if (this.sender != null)
@@ -724,7 +792,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
 
         if (this.sender != null && this.ioThread != null && this.ioThread.isAlive()) {
             log.info("Proceeding to force close the producer since pending requests could not be completed " +
-                "within timeout {} ms.", timeout);
+                    "within timeout {} ms.", timeout);
             this.sender.forceClose();
             // Only join the sender thread when not calling from callback.
             if (!invokedFromCallback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 22797b6..ab09997 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -37,7 +37,6 @@ import java.util.Map;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
-
 /**
  * A mock of the producer interface you can use for testing code that uses Kafka.
  * <p>

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 70b80c7..d6e03d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -224,6 +224,14 @@ public class ProducerConfig extends AbstractConfig {
                                                         + "Implementing the <code>ProducerInterceptor</code> interface allows you to intercept (and possibly mutate) the records "
                                                         + "received by the producer before they are published to the Kafka cluster. By default, there are no interceptors.";
 
+    /** <code>enable.idempotence</code> */
+    public static final String ENABLE_IDEMPOTENCE_CONFIG = "enable.idempotence";
+    public static final String ENABLE_IDEMPOTENCE_DOC = "When set to 'true', the producer will ensure that exactly one copy of each message is written in the stream. If 'false', producer "
+                                                        + "retries due to broker failures, etc., may write duplicates of the retried message in the stream. This is set to 'false' by default. "
+                                                        + "Note that enabling idempotence requires <code>" + MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION + "</code> to be set to 1 and "
+                                                        + "<code>" + RETRIES_CONFIG + "</code> cannot be zero. Additionally " + ACKS_CONFIG + " must be set to 'all'. If these values "
+                                                        + "are left at their defaults, we will override the default to be suitable. "
+                                                        + "If the values are set to something incompatible with the idempotent producer, a ConfigException will be thrown.";
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
@@ -306,21 +314,23 @@ public class ProducerConfig extends AbstractConfig {
                                         null,
                                         Importance.LOW,
                                         INTERCEPTOR_CLASSES_DOC)
-
-                                // security support
                                 .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
                                         CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL,
                                         Importance.MEDIUM,
                                         CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
-                                .withClientSaslSupport();
-
+                                .withClientSaslSupport()
+                                .define(ENABLE_IDEMPOTENCE_CONFIG,
+                                        Type.BOOLEAN,
+                                        false,
+                                        Importance.LOW,
+                                        ENABLE_IDEMPOTENCE_DOC);
     }
 
     public static Map<String, Object> addSerializerToConfig(Map<String, Object> configs,
                                                             Serializer<?> keySerializer, Serializer<?> valueSerializer) {
-        Map<String, Object> newConfigs = new HashMap<String, Object>();
+        Map<String, Object> newConfigs = new HashMap<>();
         newConfigs.putAll(configs);
         if (keySerializer != null)
             newConfigs.put(KEY_SERIALIZER_CLASS_CONFIG, keySerializer.getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
new file mode 100644
index 0000000..fa30b3f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/TransactionState.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_EPOCH;
+import static org.apache.kafka.common.record.RecordBatch.NO_PRODUCER_ID;
+
+/**
+ * A class which maintains state for transactions. Also keeps the state necessary to ensure idempotent production.
+ */
+public class TransactionState {
+    private volatile PidAndEpoch pidAndEpoch;
+    private final Map<TopicPartition, Integer> sequenceNumbers;
+    private final Time time;
+
+    public static class PidAndEpoch {
+        public final long producerId;
+        public final short epoch;
+
+        PidAndEpoch(long producerId, short epoch) {
+            this.producerId = producerId;
+            this.epoch = epoch;
+        }
+
+        public boolean isValid() {
+            return NO_PRODUCER_ID < producerId;
+        }
+    }
+
+    public TransactionState(Time time) {
+        this.pidAndEpoch = new PidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.sequenceNumbers = new HashMap<>();
+        this.time = time;
+    }
+
+    public boolean hasPid() {
+        return pidAndEpoch.isValid();
+    }
+
+    /**
+     * A blocking call to get the pid and epoch for the producer. If the PID and epoch has not been set, this method
+     * will block for at most maxWaitTimeMs. It is expected that this method be called from application thread
+     * contexts (ie. through Producer.send). The PID it self will be retrieved in the background thread.
+     * @param maxWaitTimeMs The maximum time to block.
+     * @return a PidAndEpoch object. Callers must call the 'isValid' method fo the returned object to ensure that a
+     *         valid Pid and epoch is actually returned.
+     */
+    public synchronized PidAndEpoch awaitPidAndEpoch(long maxWaitTimeMs) throws InterruptedException {
+        long start = time.milliseconds();
+        long elapsed = 0;
+        while (!hasPid() && elapsed < maxWaitTimeMs) {
+            wait(maxWaitTimeMs);
+            elapsed = time.milliseconds() - start;
+        }
+        return pidAndEpoch;
+    }
+
+    /**
+     * Get the current pid and epoch without blocking. Callers must use {@link PidAndEpoch#isValid()} to
+     * verify that the result is valid.
+     *
+     * @return the current PidAndEpoch.
+     */
+    public PidAndEpoch pidAndEpoch() {
+        return pidAndEpoch;
+    }
+
+    /**
+     * Set the pid and epoch atomically. This method will signal any callers blocked on the `pidAndEpoch` method
+     * once the pid is set. This method will be called on the background thread when the broker responds with the pid.
+     */
+    public synchronized void setPidAndEpoch(long pid, short epoch) {
+        this.pidAndEpoch = new PidAndEpoch(pid, epoch);
+        if (this.pidAndEpoch.isValid())
+            notifyAll();
+    }
+
+    /**
+     * This method is used when the producer needs to reset it's internal state because of an irrecoverable exception
+     * from the broker.
+     *
+     * We need to reset the producer id and associated state when we have sent a batch to the broker, but we either get
+     * a non-retriable exception or we run out of retries, or the batch expired in the producer queue after it was already
+     * sent to the broker.
+     *
+     * In all of these cases, we don't know whether batch was actually committed on the broker, and hence whether the
+     * sequence number was actually updated. If we don't reset the producer state, we risk the chance that all future
+     * messages will return an OutOfOrderSequenceException.
+     */
+    public synchronized void resetProducerId() {
+        setPidAndEpoch(NO_PRODUCER_ID, NO_PRODUCER_EPOCH);
+        this.sequenceNumbers.clear();
+    }
+
+    /**
+     * Returns the next sequence number to be written to the given TopicPartition.
+     */
+    public synchronized Integer sequenceNumber(TopicPartition topicPartition) {
+        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        if (currentSequenceNumber == null) {
+            currentSequenceNumber = 0;
+            sequenceNumbers.put(topicPartition, currentSequenceNumber);
+        }
+        return currentSequenceNumber;
+    }
+
+    public synchronized void incrementSequenceNumber(TopicPartition topicPartition, int increment) {
+        Integer currentSequenceNumber = sequenceNumbers.get(topicPartition);
+        if (currentSequenceNumber == null)
+            throw new IllegalStateException("Attempt to increment sequence number for a partition with no current sequence.");
+
+        currentSequenceNumber += increment;
+        sequenceNumbers.put(topicPartition, currentSequenceNumber);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index 2b90f81..4f68c20 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.record.AbstractRecords;
@@ -66,6 +67,7 @@ public final class ProducerBatch {
         this.lastAppendTime = createdMs;
         this.produceFuture = new ProduceRequestResult(topicPartition);
         this.completed = new AtomicBoolean();
+        this.retry = false;
     }
 
     /**
@@ -208,7 +210,7 @@ public final class ProducerBatch {
     /**
      * Returns if the batch is been retried for sending to kafka
      */
-    private boolean inRetry() {
+    public boolean inRetry() {
         return this.retry;
     }
 
@@ -228,10 +230,18 @@ public final class ProducerBatch {
         return recordsBuilder.isFull();
     }
 
+    public void setProducerState(TransactionState.PidAndEpoch pidAndEpoch, int baseSequence) {
+        recordsBuilder.setProducerState(pidAndEpoch.producerId, pidAndEpoch.epoch, baseSequence);
+    }
+
     public void close() {
         recordsBuilder.close();
     }
 
+    public boolean isClosed() {
+        return recordsBuilder.isClosed();
+    }
+
     public ByteBuffer buffer() {
         return recordsBuilder.buffer();
     }
@@ -247,4 +257,11 @@ public final class ProducerBatch {
     public byte magic() {
         return recordsBuilder.magic();
     }
+
+    /**
+     * Return the ProducerId (Pid) of the current batch.
+     */
+    public long producerId() {
+        return recordsBuilder.producerId();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 1e495d4..e07d201 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -18,11 +18,13 @@ package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -30,9 +32,9 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Rate;
 import org.apache.kafka.common.record.AbstractRecords;
 import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.common.utils.CopyOnWriteMap;
 import org.apache.kafka.common.utils.Time;
@@ -80,6 +82,7 @@ public final class RecordAccumulator {
     // The following variables are only accessed by the sender thread, so we don't need to protect them.
     private final Set<TopicPartition> muted;
     private int drainIndex;
+    private final TransactionState transactionState;
 
     /**
      * Create a new record accumulator
@@ -95,6 +98,8 @@ public final class RecordAccumulator {
      * @param metrics The metrics
      * @param time The time instance to use
      * @param apiVersions Request API versions for current connected brokers
+     * @param transactionState The shared transaction state object which tracks Pids, epochs, and sequence numbers per
+     *                         partition.
      */
     public RecordAccumulator(int batchSize,
                              long totalSize,
@@ -103,7 +108,8 @@ public final class RecordAccumulator {
                              long retryBackoffMs,
                              Metrics metrics,
                              Time time,
-                             ApiVersions apiVersions) {
+                             ApiVersions apiVersions,
+                             TransactionState transactionState) {
         this.drainIndex = 0;
         this.closed = false;
         this.flushesInProgress = new AtomicInteger(0);
@@ -119,6 +125,7 @@ public final class RecordAccumulator {
         this.muted = new HashSet<>();
         this.time = time;
         this.apiVersions = apiVersions;
+        this.transactionState = transactionState;
         registerMetrics(metrics, metricGrpName);
     }
 
@@ -202,8 +209,7 @@ public final class RecordAccumulator {
                     return appendResult;
                 }
 
-                MemoryRecordsBuilder recordsBuilder = MemoryRecords.builder(buffer, maxUsableMagic, compression,
-                        TimestampType.CREATE_TIME, this.batchSize);
+                MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
                 ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, time.milliseconds());
                 FutureRecordMetadata future = Utils.notNull(batch.tryAppend(timestamp, key, value, callback, time.milliseconds()));
 
@@ -222,17 +228,24 @@ public final class RecordAccumulator {
         }
     }
 
+    private MemoryRecordsBuilder recordsBuilder(ByteBuffer buffer, byte maxUsableMagic) {
+        if (transactionState != null && maxUsableMagic < RecordBatch.MAGIC_VALUE_V2) {
+            throw new UnsupportedVersionException("Attempting to use idempotence with a broker which does not " +
+                    "support the required message format (v2). The broker must be version 0.11 or later.");
+        }
+        return MemoryRecords.builder(buffer, maxUsableMagic, compression, TimestampType.CREATE_TIME, this.batchSize);
+    }
+
     /**
-     * If `ProducerBatch.tryAppend` fails (i.e. the record batch is full), close its memory records to release temporary
-     * resources (like compression streams buffers).
+     *  Try to append to a ProducerBatch. If it is full, we return null and a new batch is created. If the existing batch is
+     *  full, it will be closed right before send, or if it is expired, or when the producer is closed, whichever
+     *  comes first.
      */
     private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Callback callback, Deque<ProducerBatch> deque) {
         ProducerBatch last = deque.peekLast();
         if (last != null) {
             FutureRecordMetadata future = last.tryAppend(timestamp, key, value, callback, time.milliseconds());
-            if (future == null)
-                last.close();
-            else
+            if (future != null)
                 return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
         }
         return null;
@@ -404,7 +417,7 @@ public final class RecordAccumulator {
                 TopicPartition tp = new TopicPartition(part.topic(), part.partition());
                 // Only proceed if the partition has no in-flight batches.
                 if (!muted.contains(tp)) {
-                    Deque<ProducerBatch> deque = getDeque(new TopicPartition(part.topic(), part.partition()));
+                    Deque<ProducerBatch> deque = getDeque(tp);
                     if (deque != null) {
                         synchronized (deque) {
                             ProducerBatch first = deque.peekFirst();
@@ -418,7 +431,27 @@ public final class RecordAccumulator {
                                         // request
                                         break;
                                     } else {
+                                        TransactionState.PidAndEpoch pidAndEpoch = null;
+                                        if (transactionState != null) {
+                                            pidAndEpoch = transactionState.pidAndEpoch();
+                                            if (!pidAndEpoch.isValid())
+                                                // we cannot send the batch until we have refreshed the PID
+                                                break;
+                                        }
+
                                         ProducerBatch batch = deque.pollFirst();
+                                        if (pidAndEpoch != null && !batch.inRetry()) {
+                                            // If the batch is in retry, then we should not change the pid and
+                                            // sequence number, since this may introduce duplicates. In particular,
+                                            // the previous attempt may actually have been accepted, and if we change
+                                            // the pid and sequence here, this attempt will also be accepted, causing
+                                            // a duplicate.
+                                            int sequenceNumber = transactionState.sequenceNumber(batch.topicPartition);
+                                            log.debug("Dest: {} : producerId: {}, epoch: {}, Assigning sequence for {}: {}",
+                                                    node, pidAndEpoch.producerId, pidAndEpoch.epoch,
+                                                    batch.topicPartition, sequenceNumber);
+                                            batch.setProducerState(pidAndEpoch, sequenceNumber);
+                                        }
                                         batch.close();
                                         size += batch.sizeInBytes();
                                         ready.add(batch);

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index a7394e1..ab92522 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -22,11 +22,13 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.RequestCompletionHandler;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidMetadataException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
@@ -37,8 +39,11 @@ import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Rate;
+import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.requests.InitPidRequest;
+import org.apache.kafka.common.requests.InitPidResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.Time;
@@ -46,6 +51,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -96,9 +102,15 @@ public class Sender implements Runnable {
     /* the max time to wait for the server to respond to the request*/
     private final int requestTimeout;
 
+    /* The max time to wait before retrying a request which has failed */
+    private final long retryBackoffMs;
+
     /* current request API versions supported by the known brokers */
     private final ApiVersions apiVersions;
 
+    /* all the state related to transactions, in particular the PID, epoch, and sequence numbers */
+    private final TransactionState transactionState;
+
     public Sender(KafkaClient client,
                   Metadata metadata,
                   RecordAccumulator accumulator,
@@ -109,6 +121,8 @@ public class Sender implements Runnable {
                   Metrics metrics,
                   Time time,
                   int requestTimeout,
+                  long retryBackoffMs,
+                  TransactionState transactionState,
                   ApiVersions apiVersions) {
         this.client = client;
         this.accumulator = accumulator;
@@ -121,7 +135,9 @@ public class Sender implements Runnable {
         this.time = time;
         this.sensors = new SenderMetrics(metrics);
         this.requestTimeout = requestTimeout;
+        this.retryBackoffMs = retryBackoffMs;
         this.apiVersions = apiVersions;
+        this.transactionState = transactionState;
     }
 
     /**
@@ -172,6 +188,9 @@ public class Sender implements Runnable {
      */
     void run(long now) {
         Cluster cluster = metadata.fetch();
+
+        maybeWaitForPid();
+
         // get the list of partitions with data ready to send
         RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
 
@@ -197,10 +216,8 @@ public class Sender implements Runnable {
         }
 
         // create produce requests
-        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster,
-                                                                         result.readyNodes,
-                                                                         this.maxRequestSize,
-                                                                         now);
+        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes,
+                this.maxRequestSize, now);
         if (guaranteeMessageOrder) {
             // Mute all the partitions drained
             for (List<ProducerBatch> batchList : batches.values()) {
@@ -210,9 +227,22 @@ public class Sender implements Runnable {
         }
 
         List<ProducerBatch> expiredBatches = this.accumulator.abortExpiredBatches(this.requestTimeout, now);
-        // update sensors
-        for (ProducerBatch expiredBatch : expiredBatches)
+
+        boolean needsTransactionStateReset = false;
+        // Reset the PID if an expired batch has previously been sent to the broker. Also update the metrics
+        // for expired batches. see the documentation of @TransactionState.resetProducerId to understand why
+        // we need to reset the producer id here.
+        for (ProducerBatch expiredBatch : expiredBatches) {
+            if (transactionState != null && expiredBatch.inRetry()) {
+                needsTransactionStateReset = true;
+            }
             this.sensors.recordErrors(expiredBatch.topicPartition.topic(), expiredBatch.recordCount);
+        }
+
+        if (needsTransactionStateReset) {
+            transactionState.resetProducerId();
+            return;
+        }
 
         sensors.updateProduceRequestMetrics(batches);
 
@@ -253,6 +283,50 @@ public class Sender implements Runnable {
         initiateClose();
     }
 
+    private ClientResponse sendAndAwaitInitPidRequest(Node node) throws IOException {
+        String nodeId = node.idString();
+        InitPidRequest.Builder builder = new InitPidRequest.Builder(null);
+        ClientRequest request = client.newClientRequest(nodeId, builder, time.milliseconds(), true, null);
+        return NetworkClientUtils.sendAndReceive(client, request, time);
+    }
+
+    private Node awaitLeastLoadedNodeReady(long remainingTimeMs) throws IOException {
+        Node node = client.leastLoadedNode(time.milliseconds());
+        if (NetworkClientUtils.awaitReady(client, node, time, remainingTimeMs)) {
+            return node;
+        }
+        return null;
+    }
+
+    private void maybeWaitForPid() {
+        if (transactionState == null)
+            return;
+
+        while (!transactionState.hasPid()) {
+            try {
+                Node node = awaitLeastLoadedNodeReady(requestTimeout);
+                if (node != null) {
+                    ClientResponse response = sendAndAwaitInitPidRequest(node);
+                    if (response.hasResponse() && (response.responseBody() instanceof InitPidResponse)) {
+                        InitPidResponse initPidResponse = (InitPidResponse) response.responseBody();
+                        transactionState.setPidAndEpoch(initPidResponse.producerId(), initPidResponse.epoch());
+                    } else {
+                        log.error("Received an unexpected response type for an InitPidRequest from {}. " +
+                                "We will back off and try again.", node);
+                    }
+                } else {
+                    log.debug("Could not find an available broker to send InitPidRequest to. " +
+                            "We will back off and try again.");
+                }
+            } catch (Exception e) {
+                log.warn("Received an exception while trying to get a pid. Will back off and retry.", e);
+            }
+            log.trace("Retry InitPidRequest in {}ms.", retryBackoffMs);
+            time.sleep(retryBackoffMs);
+            metadata.requestUpdate();
+        }
+    }
+
     /**
      * Handle a produce response
      */
@@ -300,32 +374,55 @@ public class Sender implements Runnable {
     private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                                long now) {
         Errors error = response.error;
-        if (error != Errors.NONE && canRetry(batch, error)) {
-            // retry
-            log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
-                     correlationId,
-                     batch.topicPartition,
-                     this.retries - batch.attempts() - 1,
-                     error);
-            this.accumulator.reenqueue(batch, now);
-            this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
-        } else {
-            RuntimeException exception;
-            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
-                exception = new TopicAuthorizationException(batch.topicPartition.topic());
-            else
-                exception = error.exception();
-            // tell the user the result of their request
-            batch.done(response.baseOffset, response.logAppendTime, exception);
-            this.accumulator.deallocate(batch);
-            if (error != Errors.NONE)
+        if (error != Errors.NONE) {
+            if (canRetry(batch, error)) {
+                log.warn("Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
+                        correlationId,
+                        batch.topicPartition,
+                        this.retries - batch.attempts() - 1,
+                        error);
+                if (transactionState == null) {
+                    reenqueueBatch(batch, now);
+                } else if (transactionState.pidAndEpoch().producerId == batch.producerId()) {
+                    // If idempotence is enabled only retry the request if the current PID is the same as the pid of the batch.
+                    log.debug("Retrying batch to topic-partition {}. Sequence number : {}", batch.topicPartition,
+                            transactionState.sequenceNumber(batch.topicPartition));
+                    reenqueueBatch(batch, now);
+                } else {
+                    failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
+                            "batch but the producer id changed from " + batch.producerId() + " to " +
+                            transactionState.pidAndEpoch().producerId + " in the mean time. This batch will be dropped."));
+                    this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
+                }
+            } else {
+                final RuntimeException exception;
+                if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
+                    exception = new TopicAuthorizationException(batch.topicPartition.topic());
+                else
+                    exception = error.exception();
+                if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER && batch.producerId() == transactionState.pidAndEpoch().producerId)
+                    log.error("The broker received an out of order sequence number for correlation id {}, topic-partition " +
+                                    "{} at offset {}. This indicates data loss on the broker, and should be investigated.",
+                            correlationId, batch.topicPartition, response.baseOffset);
+                // tell the user the result of their request
+                failBatch(batch, response, exception);
                 this.sensors.recordErrors(batch.topicPartition.topic(), batch.recordCount);
-        }
-        if (error.exception() instanceof InvalidMetadataException) {
-            if (error.exception() instanceof UnknownTopicOrPartitionException)
-                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
-                        "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
-            metadata.requestUpdate();
+            }
+            if (error.exception() instanceof InvalidMetadataException) {
+                if (error.exception() instanceof UnknownTopicOrPartitionException)
+                    log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
+                            "topic/partition may not exist or the user may not have Describe access to it", batch.topicPartition);
+                metadata.requestUpdate();
+            }
+
+        } else {
+            completeBatch(batch, response);
+
+            if (transactionState != null && transactionState.pidAndEpoch().producerId == batch.producerId()) {
+                transactionState.incrementSequenceNumber(batch.topicPartition, batch.recordCount);
+                log.debug("Incremented sequence number for topic-partition {} to {}", batch.topicPartition,
+                        transactionState.sequenceNumber(batch.topicPartition));
+            }
         }
 
         // Unmute the completed partition.
@@ -333,6 +430,27 @@ public class Sender implements Runnable {
             this.accumulator.unmutePartition(batch.topicPartition);
     }
 
+    private void reenqueueBatch(ProducerBatch batch, long currentTimeMs) {
+        this.accumulator.reenqueue(batch, currentTimeMs);
+        this.sensors.recordRetries(batch.topicPartition.topic(), batch.recordCount);
+    }
+
+    private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
+        batch.done(response.baseOffset, response.logAppendTime, null);
+        this.accumulator.deallocate(batch);
+    }
+
+    private void failBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, RuntimeException exception) {
+        if (transactionState != null && batch.producerId() == transactionState.pidAndEpoch().producerId) {
+            // Reset the transaction state since we have hit an irrecoverable exception and cannot make any guarantees
+            // about the previously committed message. Note that this will discard the producer id and sequence
+            // numbers for all existing partitions.
+            transactionState.resetProducerId();
+        }
+        batch.done(response.baseOffset, response.logAppendTime, exception);
+        this.accumulator.deallocate(batch);
+    }
+
     /**
      * We can retry a send if the error is transient and the number of attempts taken is fewer than the maximum allowed
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
new file mode 100644
index 0000000..469ba98
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/DuplicateSequenceNumberException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class DuplicateSequenceNumberException extends RetriableException {
+
+    public DuplicateSequenceNumberException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
new file mode 100644
index 0000000..1c1cc6b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class OutOfOrderSequenceException extends ApiException {
+
+    public OutOfOrderSequenceException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
new file mode 100644
index 0000000..c699d8e
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class ProducerFencedException extends ApiException {
+
+    public ProducerFencedException(String msg) {
+        super(msg);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 89b2000..4183638 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -46,7 +46,8 @@ public enum ApiKeys {
     API_VERSIONS(18, "ApiVersions"),
     CREATE_TOPICS(19, "CreateTopics"),
     DELETE_TOPICS(20, "DeleteTopics"),
-    DELETE_RECORDS(21, "DeleteRecords");
+    DELETE_RECORDS(21, "DeleteRecords"),
+    INIT_PRODUCER_ID(22, "InitProducerId");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a2a33ee..519e52c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.errors.BrokerNotAvailableException;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.ControllerMovedException;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.errors.DuplicateSequenceNumberException;
 import org.apache.kafka.common.errors.GroupAuthorizationException;
 import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
 import org.apache.kafka.common.errors.GroupLoadInProgressException;
@@ -39,10 +40,12 @@ import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
 import org.apache.kafka.common.errors.InvalidReplicationFactorException;
 import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidRequiredAcksException;
+import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.InvalidSessionTimeoutException;
 import org.apache.kafka.common.errors.InvalidTimestampException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.NotControllerException;
@@ -166,12 +169,15 @@ public enum Errors {
             " the message was sent to an incompatible broker. See the broker logs for more details.")),
     UNSUPPORTED_FOR_MESSAGE_FORMAT(43,
         new UnsupportedForMessageFormatException("The message format version on the broker does not support the request.")),
-    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy."));
+    POLICY_VIOLATION(44, new PolicyViolationException("Request parameters do not satisfy the configured policy.")),
+    OUT_OF_ORDER_SEQUENCE_NUMBER(45, new OutOfOrderSequenceException("The broker received an out of order sequence number")),
+    DUPLICATE_SEQUENCE_NUMBER(46, new DuplicateSequenceNumberException("The broker received a duplicate sequence number")),
+    PRODUCER_FENCED(47, new ProducerFencedException("Producer attempted an operation with an old epoch"));
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
-    private static Map<Class<?>, Errors> classToError = new HashMap<Class<?>, Errors>();
-    private static Map<Short, Errors> codeToError = new HashMap<Short, Errors>();
+    private static Map<Class<?>, Errors> classToError = new HashMap<>();
+    private static Map<Short, Errors> codeToError = new HashMap<>();
 
     static {
         for (Errors error : Errors.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 5d7004a..37eb75c 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1176,6 +1176,29 @@ public class Protocol {
     public static final Schema[] DELETE_RECORDS_REQUEST = new Schema[] {DELETE_RECORDS_REQUEST_V0};
     public static final Schema[] DELETE_RECORDS_RESPONSE = new Schema[] {DELETE_RECORDS_RESPONSE_V0};
 
+    /* Transactions API */
+    public static final Schema INIT_PRODUCER_ID_REQUEST_V0 = new Schema(
+            new Field("transactional_id",
+                    NULLABLE_STRING,
+                    "The transactional id whose pid we want to retrieve or generate.")
+    );
+
+    public static final Schema INIT_PRODUCER_ID_RESPONSE_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "An integer error code."),
+            new Field("pid",
+                    INT64,
+                    "The pid for the input transactional id. If the input id was empty, then this is used only for ensuring idempotence of messages"),
+            new Field("epoch",
+                    INT16,
+                    "The epoch for the pid. Will always be 0 if no transactional id was specified in the request.")
+    );
+
+    public static final Schema[] INIT_PRODUCER_ID_REQUEST = new Schema[] {INIT_PRODUCER_ID_REQUEST_V0};
+
+    public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1208,6 +1231,7 @@ public class Protocol {
         REQUESTS[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
+        REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1231,6 +1255,7 @@ public class Protocol {
         RESPONSES[ApiKeys.CREATE_TOPICS.id] = CREATE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
+        RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 325690d..c42390b 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -134,6 +134,10 @@ public class Struct {
         return (Integer) get(name);
     }
 
+    public Long getUnsignedInt(String name) {
+        return (Long) get(name);
+    }
+
     public Long getLong(Field field) {
         return (Long) get(field);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
index ffca09c..57d31f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Type.java
@@ -194,6 +194,36 @@ public abstract class Type {
         }
     };
 
+    public static final Type UNSIGNED_INT32 = new Type() {
+        @Override
+        public void write(ByteBuffer buffer, Object o) {
+            ByteUtils.writeUnsignedInt(buffer, (long) o);
+        }
+
+        @Override
+        public Object read(ByteBuffer buffer) {
+            return ByteUtils.readUnsignedInt(buffer);
+        }
+
+        @Override
+        public int sizeOf(Object o) {
+            return 4;
+        }
+
+        @Override
+        public String toString() {
+            return "UINT32";
+        }
+
+        @Override
+        public Long validate(Object item) {
+            if (item instanceof Long)
+                return (Long) item;
+            else
+                throw new SchemaException(item + " is not a Long.");
+        }
+    };
+
     public static final Type INT64 = new Type() {
         @Override
         public void write(ByteBuffer buffer, Object o) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 1b74a7d..7e09b93 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -175,6 +175,11 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
+    public boolean hasProducerId() {
+        return false;
+    }
+
+    @Override
     public long sequence() {
         return RecordBatch.NO_SEQUENCE;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
index 53245e7..78ad050 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecordBatch.java
@@ -19,6 +19,11 @@ package org.apache.kafka.common.record;
 abstract class AbstractRecordBatch implements RecordBatch {
 
     @Override
+    public boolean hasProducerId() {
+        return RecordBatch.NO_PRODUCER_ID < producerId();
+    }
+
+    @Override
     public long nextOffset() {
         return lastOffset() + 1;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index e59d9fd..8c4e771 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -147,7 +147,7 @@ public class MemoryRecords extends AbstractRecords {
 
                 messagesRead += 1;
 
-                if (filter.shouldRetain(record)) {
+                if (filter.shouldRetain(batch, record)) {
                     // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
                     // the corrupted batch with correct data.
                     if (!record.hasMagic(batchMagic))
@@ -245,7 +245,7 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public interface RecordFilter {
-        boolean shouldRetain(Record record);
+        boolean shouldRetain(RecordBatch recordBatch, Record record);
     }
 
     public static class FilterResult {
@@ -338,13 +338,27 @@ public class MemoryRecords extends AbstractRecords {
         return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME, records);
     }
 
+    public static MemoryRecords withRecords(long initialOffset, CompressionType compressionType, Long pid,
+                                            short epoch, int baseSequence, SimpleRecord... records) {
+        return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
+                pid, epoch, baseSequence, records);
+    }
+
     public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
                                             TimestampType timestampType, SimpleRecord... records) {
+        return withRecords(magic, initialOffset, compressionType, timestampType, RecordBatch.NO_PRODUCER_ID,
+                RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, records);
+    }
+
+    private static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
+                                             TimestampType timestampType, long pid, short epoch, int baseSequence,
+                                             SimpleRecord ... records) {
         if (records.length == 0)
             return MemoryRecords.EMPTY;
         int sizeEstimate = AbstractRecords.estimateSizeInBytes(magic, compressionType, Arrays.asList(records));
         ByteBuffer buffer = ByteBuffer.allocate(sizeEstimate);
-        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset);
+        MemoryRecordsBuilder builder = builder(buffer, magic, compressionType, timestampType, initialOffset,
+                System.currentTimeMillis(), pid, epoch, baseSequence);
         for (SimpleRecord record : records)
             builder.append(record);
         return builder.build();