You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/03 02:41:49 UTC

[3/4] kafka git commit: KAFKA-4817; Add idempotent producer semantics

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index ab81bfe..d238093 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -56,14 +56,14 @@ public class MemoryRecordsBuilder {
     private final int initPos;
     private final long baseOffset;
     private final long logAppendTime;
-    private final long producerId;
-    private final short producerEpoch;
-    private final int baseSequence;
     private final boolean isTransactional;
     private final int partitionLeaderEpoch;
     private final int writeLimit;
     private final int initialCapacity;
 
+    private long producerId;
+    private short producerEpoch;
+    private int baseSequence;
     private long writtenUncompressed = 0;
     private int numRecords = 0;
     private float compressionRate = 1;
@@ -193,6 +193,19 @@ public class MemoryRecordsBuilder {
             return new RecordsInfo(maxTimestamp, compressionType == CompressionType.NONE ? offsetOfMaxTimestamp : lastOffset);
     }
 
+    public void setProducerState(long pid, short epoch, int baseSequence) {
+        if (isClosed()) {
+            // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
+            // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
+            // be re queued. In this case, we should not attempt to set the state again, since changing the pid and sequence
+            // once a batch has been sent to the broker risks introducing duplicates.
+            throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
+        }
+        this.producerId = pid;
+        this.producerEpoch = epoch;
+        this.baseSequence = baseSequence;
+    }
+
     public void close() {
         if (builtRecords != null)
             return;
@@ -577,4 +590,11 @@ public class MemoryRecordsBuilder {
             this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
         }
     }
+
+    /**
+     * Return the ProducerId (PID) of the RecordBatches created by this builder.
+     */
+    public long producerId() {
+        return this.producerId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 90f1486..ae4a225 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -146,6 +146,11 @@ public interface RecordBatch extends Iterable<Record> {
     short producerEpoch();
 
     /**
+     * Does the batch have a valid producer id set.
+     */
+    boolean hasProducerId();
+
+    /**
      * Get the first sequence number of this record batch.
      * @return The first sequence number or -1 if there is none
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 3a99a8a..1638556 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -171,6 +171,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case DELETE_RECORDS:
                 request = new DeleteRecordsRequest(struct, version);
                 break;
+            case INIT_PRODUCER_ID:
+                request = new InitPidRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index a5d0dc4..314aa42 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -93,6 +93,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DeleteTopicsResponse(struct);
             case DELETE_RECORDS:
                 return new DeleteRecordsResponse(struct);
+            case INIT_PRODUCER_ID:
+                return new InitPidResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
new file mode 100644
index 0000000..284107f
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidRequest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+
+public class InitPidRequest extends AbstractRequest {
+    private static final String TRANSACTIONAL_ID_KEY_NAME = "transactional_id";
+
+    private final String transactionalId;
+
+    public static class Builder extends AbstractRequest.Builder<InitPidRequest> {
+        private final String transactionalId;
+        public Builder(String transactionalId) {
+            super(ApiKeys.INIT_PRODUCER_ID);
+            if (transactionalId != null && transactionalId.isEmpty())
+                throw new IllegalArgumentException("Must set either a null or a non-empty transactional id.");
+            this.transactionalId = transactionalId;
+        }
+
+        @Override
+        public InitPidRequest build(short version) {
+            return new InitPidRequest(this.transactionalId, version);
+        }
+
+        @Override
+        public String toString() {
+            return "(type=InitPidRequest)";
+        }
+
+    }
+
+    public InitPidRequest(Struct struct, short version) {
+        super(version);
+        this.transactionalId = struct.getString(TRANSACTIONAL_ID_KEY_NAME);
+    }
+
+    private InitPidRequest(String transactionalId, short version) {
+        super(version);
+        this.transactionalId = transactionalId;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        return new InitPidResponse(Errors.forException(e));
+    }
+
+    public static InitPidRequest parse(ByteBuffer buffer, short version) {
+        return new InitPidRequest(ApiKeys.INIT_PRODUCER_ID.parseRequest(version, buffer), version);
+    }
+
+    public String transactionalId() {
+        return transactionalId;
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.requestSchema(version()));
+        struct.set(TRANSACTIONAL_ID_KEY_NAME, transactionalId);
+        return struct;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
new file mode 100644
index 0000000..ee92375
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/InitPidResponse.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.RecordBatch;
+
+import java.nio.ByteBuffer;
+
+public class InitPidResponse extends AbstractResponse {
+    /**
+     * Possible Error codes:
+     * OK
+     *
+     */
+    private static final String PRODUCER_ID_KEY_NAME = "pid";
+    private static final String EPOCH_KEY_NAME = "epoch";
+    private static final String ERROR_CODE_KEY_NAME = "error_code";
+    private final Errors error;
+    private final long producerId;
+    private final short epoch;
+
+    public InitPidResponse(Errors error, long producerId, short epoch) {
+        this.error = error;
+        this.producerId = producerId;
+        this.epoch = epoch;
+    }
+
+    public InitPidResponse(Struct struct) {
+        this.error = Errors.forCode(struct.getShort(ERROR_CODE_KEY_NAME));
+        this.producerId = struct.getLong(PRODUCER_ID_KEY_NAME);
+        this.epoch = struct.getShort(EPOCH_KEY_NAME);
+    }
+
+    public InitPidResponse(Errors errors) {
+        this(errors, RecordBatch.NO_PRODUCER_ID, (short) 0);
+    }
+
+    public long producerId() {
+        return producerId;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public short epoch() {
+        return epoch;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.INIT_PRODUCER_ID.responseSchema(version));
+        struct.set(PRODUCER_ID_KEY_NAME, producerId);
+        struct.set(EPOCH_KEY_NAME, epoch);
+        struct.set(ERROR_CODE_KEY_NAME, error.code());
+        return struct;
+    }
+
+    public static InitPidResponse parse(ByteBuffer buffer, short version) {
+        return new InitPidResponse(ApiKeys.INIT_PRODUCER_ID.parseResponse(version, buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
index 50c90a8..6efe311 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ByteUtils.java
@@ -31,6 +31,16 @@ public final class ByteUtils {
     private ByteUtils() {}
 
     /**
+     * Read an unsigned integer from the current position in the buffer, incrementing the position by 4 bytes
+     *
+     * @param buffer The buffer to read from
+     * @return The integer read, as a long to avoid signedness
+     */
+    public static long readUnsignedInt(ByteBuffer buffer) {
+        return buffer.getInt() & 0xffffffffL;
+    }
+
+    /**
      * Read an unsigned integer from the given position without modifying the buffers position
      *
      * @param buffer the buffer to read from

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 9cc863b..9117e16 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -17,18 +17,23 @@
 package org.apache.kafka.clients.producer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.CompressionType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.DefaultRecord;
 import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.junit.After;
@@ -89,7 +94,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10L * batchSize,
-                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             // append to the first batch
@@ -108,7 +113,6 @@ public class RecordAccumulatorTest {
         Deque<ProducerBatch> partitionBatches = accum.batches().get(tp1);
         assertEquals(2, partitionBatches.size());
         Iterator<ProducerBatch> partitionBatchesIterator = partitionBatches.iterator();
-        assertFalse(partitionBatchesIterator.next().isWritable());
         assertTrue(partitionBatchesIterator.next().isWritable());
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
 
@@ -129,7 +133,7 @@ public class RecordAccumulatorTest {
     public void testAppendLarge() throws Exception {
         int batchSize = 512;
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
         accum.append(tp1, 0L, key, new byte[2 * batchSize], null, maxBlockTimeMs);
         assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes);
     }
@@ -138,7 +142,7 @@ public class RecordAccumulatorTest {
     public void testLinger() throws Exception {
         long lingerMs = 10L;
         RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
         assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size());
         time.sleep(10);
@@ -157,7 +161,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testPartialDrain() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10L, 100L, metrics, time, new ApiVersions(), null);
         int appends = 1024 / msgSize + 1;
         List<TopicPartition> partitions = asList(tp1, tp2);
         for (TopicPartition tp : partitions) {
@@ -177,7 +181,7 @@ public class RecordAccumulatorTest {
         final int msgs = 10000;
         final int numParts = 2;
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 0L, 100L, metrics, time, new ApiVersions(), null);
         List<Thread> threads = new ArrayList<>();
         for (int i = 0; i < numThreads; i++) {
             threads.add(new Thread() {
@@ -222,7 +226,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         // Just short of going over the limit so we trigger linger time
         int appends = expectedNumAppends(batchSize);
 
@@ -257,7 +261,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE / 4;
         long retryBackoffMs = Long.MAX_VALUE / 2;
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
 
         long now = time.milliseconds();
         accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -295,7 +299,7 @@ public class RecordAccumulatorTest {
     public void testFlush() throws Exception {
         long lingerMs = Long.MAX_VALUE;
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         for (int i = 0; i < 100; i++)
             accum.append(new TopicPartition(topic, i % 3), 0L, key, value, null, maxBlockTimeMs);
         RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds());
@@ -329,7 +333,7 @@ public class RecordAccumulatorTest {
     @Test
     public void testAwaitFlushComplete() throws Exception {
         RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, Long.MAX_VALUE, 100L, metrics, time, new ApiVersions(), null);
         accum.append(new TopicPartition(topic, 0), 0L, key, value, null, maxBlockTimeMs);
 
         accum.beginFlush();
@@ -349,7 +353,7 @@ public class RecordAccumulatorTest {
         long lingerMs = Long.MAX_VALUE;
         final AtomicInteger numExceptionReceivedInCallback = new AtomicInteger(0);
         final RecordAccumulator accum = new RecordAccumulator(4 * 1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 64 * 1024,
-                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, 100L, metrics, time, new ApiVersions(), null);
         class TestCallback implements Callback {
             @Override
             public void onCompletion(RecordMetadata metadata, Exception exception) {
@@ -378,7 +382,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
 
         // Test batches not in retry
@@ -449,7 +453,7 @@ public class RecordAccumulatorTest {
         int messagesPerBatch = expectedNumAppends(1024);
 
         final RecordAccumulator accum = new RecordAccumulator(1024 + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024,
-                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions());
+                CompressionType.NONE, lingerMs, retryBackoffMs, metrics, time, new ApiVersions(), null);
         final AtomicInteger expiryCallbackCount = new AtomicInteger();
         final AtomicReference<Exception> unexpectedException = new AtomicReference<>();
         Callback callback = new Callback() {
@@ -490,7 +494,7 @@ public class RecordAccumulatorTest {
         int batchSize = 1025;
 
         RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
-                CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions());
+                CompressionType.NONE, 10, 100L, metrics, time, new ApiVersions(), null);
         int appends = expectedNumAppends(batchSize);
         for (int i = 0; i < appends; i++) {
             accum.append(tp1, 0L, key, value, null, maxBlockTimeMs);
@@ -519,6 +523,18 @@ public class RecordAccumulatorTest {
         assertTrue("The batch should have been drained.", drained.get(node1.id()).size() > 0);
     }
 
+    @Test(expected = UnsupportedVersionException.class)
+    public void testIdempotenceWithOldMagic() throws InterruptedException {
+        // Simulate talking to an older broker, ie. one which supports a lower magic.
+        ApiVersions apiVersions = new ApiVersions();
+        int batchSize = 1025;
+        apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
+                (short) 0, (short) 2))));
+        RecordAccumulator accum = new RecordAccumulator(batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * batchSize,
+                CompressionType.NONE, 10, 100L, metrics, time, apiVersions, new TransactionState(time));
+        accum.append(tp1, 0L, key, value, null, 0);
+    }
+
     /**
      * Return the offset delta.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 0dea6b6..0d19aa0 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.TransactionState;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.Node;
@@ -32,11 +33,14 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.MutableRecordBatch;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.ProduceRequest;
+import org.apache.kafka.common.requests.InitPidRequest;
+import org.apache.kafka.common.requests.InitPidResponse;
 import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.test.TestUtils;
@@ -46,6 +50,7 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
@@ -81,24 +86,7 @@ public class SenderTest {
 
     @Before
     public void setup() {
-        Map<String, String> metricTags = new LinkedHashMap<>();
-        metricTags.put("client-id", CLIENT_ID);
-        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
-        metrics = new Metrics(metricConfig, time);
-        accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions);
-        sender = new Sender(client,
-                            metadata,
-                            this.accumulator,
-                            true,
-                            MAX_REQUEST_SIZE,
-                            ACKS_ALL,
-                            MAX_RETRIES,
-                            metrics,
-                            time,
-                            REQUEST_TIMEOUT,
-                            apiVersions);
-
-        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+        setupWithTransactionState(null);
     }
 
     @After
@@ -244,16 +232,19 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(client,
-                                       metadata,
-                                       this.accumulator,
-                                       false,
-                                       MAX_REQUEST_SIZE,
-                                       ACKS_ALL,
-                                       maxRetries,
-                                       m,
-                                       time,
-                                       REQUEST_TIMEOUT,
-                                       apiVersions);
+                    metadata,
+                    this.accumulator,
+                    false,
+                    MAX_REQUEST_SIZE,
+                    ACKS_ALL,
+                    maxRetries,
+                    m,
+                    time,
+                    REQUEST_TIMEOUT,
+                    50,
+                    null,
+                    apiVersions
+            );
             // do a successful retry
             Future<RecordMetadata> future = accumulator.append(tp0, 0L, "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
             sender.run(time.milliseconds()); // connect
@@ -300,17 +291,19 @@ public class SenderTest {
         Metrics m = new Metrics();
         try {
             Sender sender = new Sender(client,
-                metadata,
-                this.accumulator,
-                true,
-                MAX_REQUEST_SIZE,
-                ACKS_ALL,
-                maxRetries,
-                m,
-                time,
-                REQUEST_TIMEOUT,
-                apiVersions);
-
+                    metadata,
+                    this.accumulator,
+                    true,
+                    MAX_REQUEST_SIZE,
+                    ACKS_ALL,
+                    maxRetries,
+                    m,
+                    time,
+                    REQUEST_TIMEOUT,
+                    50,
+                    null,
+                    apiVersions
+            );
             // Create a two broker cluster, with partition 0 on broker 0 and partition 1 on broker 1
             Cluster cluster1 = TestUtils.clusterWith(2, "test", 2);
             metadata.update(cluster1, Collections.<String>emptySet(), time.milliseconds());
@@ -381,6 +374,164 @@ public class SenderTest {
         assertTrue("Request should be completed", future.isDone());
     }
 
+    @Test
+    public void testInitPidRequest() throws Exception {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof InitPidRequest;
+            }
+        }, new InitPidResponse(Errors.NONE, producerId, (short) 0));
+        sender.run(time.milliseconds());
+        assertTrue(transactionState.hasPid());
+        assertEquals(producerId, transactionState.pidAndEpoch().producerId);
+        assertEquals((short) 0, transactionState.pidAndEpoch().epoch);
+    }
+
+    @Test
+    public void testSequenceNumberIncrement() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                if (body instanceof ProduceRequest) {
+                    ProduceRequest request = (ProduceRequest) body;
+                    MemoryRecords records = request.partitionRecordsOrFail().get(tp0);
+                    Iterator<MutableRecordBatch> batchIterator = records.batches().iterator();
+                    assertTrue(batchIterator.hasNext());
+                    RecordBatch batch = batchIterator.next();
+                    assertFalse(batchIterator.hasNext());
+                    assertEquals(0, batch.baseSequence());
+                    assertEquals(producerId, batch.producerId());
+                    assertEquals(0, batch.producerEpoch());
+                    return true;
+                }
+                return false;
+            }
+        }, produceResponse(tp0, 0, Errors.NONE, 0));
+
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        sender.run(time.milliseconds());  // receive response
+        assertTrue(responseFuture.isDone());
+        assertEquals((long) transactionState.sequenceNumber(tp0), 1L);
+    }
+
+    @Test
+    public void testAbortRetryWhenPidChanges() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+        String id = client.requests().peek().destination();
+        Node node = new Node(Integer.valueOf(id), "localhost", 0);
+        assertEquals(1, client.inFlightRequestCount());
+        assertTrue("Client ready status should be true", client.isReady(node, 0L));
+        client.disconnect(id);
+        assertEquals(0, client.inFlightRequestCount());
+        assertFalse("Client ready status should be false", client.isReady(node, 0L));
+
+        transactionState.setPidAndEpoch(producerId + 1, (short) 0);
+        sender.run(time.milliseconds()); // receive error
+        sender.run(time.milliseconds()); // reconnect
+        sender.run(time.milliseconds()); // nothing to do, since the pid has changed. We should check the metrics for errors.
+        assertEquals("Expected requests to be aborted after pid change", 0, client.inFlightRequestCount());
+
+        KafkaMetric recordErrors = m.metrics().get(m.metricName("record-error-rate", METRIC_GROUP, ""));
+        assertTrue("Expected non-zero value for record send errors", recordErrors.value() > 0);
+
+        assertTrue(responseFuture.isDone());
+        assertEquals((long) transactionState.sequenceNumber(tp0), 0L);
+    }
+
+    @Test
+    public void testResetWhenOutOfOrderSequenceReceived() throws InterruptedException {
+        final long producerId = 343434L;
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.setPidAndEpoch(producerId, (short) 0);
+        setupWithTransactionState(transactionState);
+        client.setNode(new Node(1, "localhost", 33343));
+
+        int maxRetries = 10;
+        Metrics m = new Metrics();
+        Sender sender = new Sender(client,
+                metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                maxRetries,
+                m,
+                time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions
+        );
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());  // connect.
+        sender.run(time.milliseconds());  // send.
+
+        assertEquals(1, client.inFlightRequestCount());
+
+        client.respond(produceResponse(tp0, 0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
+
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertFalse("Expected transaction state to be reset upon receiving an OutOfOrderSequenceException", transactionState.hasPid());
+    }
+
     private void completedWithError(Future<RecordMetadata> future, Errors error) throws Exception {
         assertTrue("Request should be completed", future.isDone());
         try {
@@ -397,4 +548,25 @@ public class SenderTest {
         return new ProduceResponse(partResp, throttleTimeMs);
     }
 
+    private void setupWithTransactionState(TransactionState transactionState) {
+        Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put("client-id", CLIENT_ID);
+        MetricConfig metricConfig = new MetricConfig().tags(metricTags);
+        this.metrics = new Metrics(metricConfig, time);
+        this.accumulator = new RecordAccumulator(batchSize, 1024 * 1024, CompressionType.NONE, 0L, 0L, metrics, time, apiVersions, transactionState);
+        this.sender = new Sender(this.client,
+                this.metadata,
+                this.accumulator,
+                true,
+                MAX_REQUEST_SIZE,
+                ACKS_ALL,
+                MAX_RETRIES,
+                this.metrics,
+                this.time,
+                REQUEST_TIMEOUT,
+                50,
+                transactionState,
+                apiVersions);
+        this.metadata.update(this.cluster, Collections.<String>emptySet(), time.milliseconds());
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
new file mode 100644
index 0000000..a8a1716
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionStateTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.producer.internals;
+
+
+import org.apache.kafka.clients.producer.TransactionState;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TransactionStateTest {
+
+    private TopicPartition topicPartition;
+
+    @Before
+    public void setUp() {
+        topicPartition = new TopicPartition("topic-0", 0);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void testInvalidSequenceIncrement() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        transactionState.incrementSequenceNumber(topicPartition, 3333);
+    }
+
+    @Test
+    public void testDefaultSequenceNumber() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+        transactionState.incrementSequenceNumber(topicPartition, 3);
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+    }
+
+
+    @Test
+    public void testProducerIdReset() {
+        TransactionState transactionState = new TransactionState(new MockTime());
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+        transactionState.incrementSequenceNumber(topicPartition, 3);
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 3);
+        transactionState.resetProducerId();
+        assertEquals((int) transactionState.sequenceNumber(topicPartition), 0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 2dd5ab0..a2c761f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -355,7 +355,7 @@ public class MemoryRecordsTest {
 
     private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
         @Override
-        public boolean shouldRetain(Record record) {
+        public boolean shouldRetain(RecordBatch batch, Record record) {
             return record.hasKey();
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2024f90..8a7633e 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -136,6 +136,9 @@ public class RequestResponseTest {
         checkRequest(createDeleteTopicsRequest());
         checkErrorResponse(createDeleteTopicsRequest(), new UnknownServerException());
         checkResponse(createDeleteTopicsResponse(), 0);
+        checkRequest(createInitPidRequest());
+        checkErrorResponse(createInitPidRequest(), new UnknownServerException());
+        checkResponse(createInitPidResponse(), 0);
         checkOlderFetchVersions();
         checkResponse(createMetadataResponse(), 0);
         checkResponse(createMetadataResponse(), 1);
@@ -787,6 +790,14 @@ public class RequestResponseTest {
         return new DeleteTopicsResponse(errors);
     }
 
+    private InitPidRequest createInitPidRequest() {
+        return new InitPidRequest.Builder(null).build();
+    }
+
+    private InitPidResponse createInitPidResponse() {
+        return new InitPidResponse(Errors.NONE, 3332, (short) 3);
+    }
+
     private static class ByteBufferChannel implements GatheringByteChannel {
         private final ByteBuffer buf;
         private boolean closed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
index 0a082fb..ce23a33 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/ByteUtilsTest.java
@@ -65,6 +65,16 @@ public class ByteUtilsTest {
     }
 
     @Test
+    public void testReadUnsignedInt() {
+        ByteBuffer buffer = ByteBuffer.allocate(4);
+        long writeValue = 133444;
+        ByteUtils.writeUnsignedInt(buffer, writeValue);
+        buffer.flip();
+        long readValue = ByteUtils.readUnsignedInt(buffer);
+        assertEquals(writeValue, readValue);
+    }
+
+    @Test
     public void testWriteUnsignedIntLEToArray() {
         int value1 = 0x04030201;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index a2308b2..194cfcc 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -24,9 +24,9 @@ import kafka.cluster.Broker
 import kafka.common.{KafkaException, TopicAndPartition}
 import kafka.server.KafkaConfig
 import kafka.utils._
-import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.clients._
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ChannelBuilders, ListenerName, NetworkReceive, Selectable, Selector}
+import org.apache.kafka.common.network._
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
 import org.apache.kafka.common.requests
 import org.apache.kafka.common.requests.{UpdateMetadataRequest, _}
@@ -180,7 +180,6 @@ class RequestSendThread(val controllerId: Int,
     def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))
 
     val QueueItem(apiKey, requestBuilder, callback) = queue.take()
-    import NetworkClientBlockingOps._
     var clientResponse: ClientResponse = null
     try {
       lock synchronized {
@@ -196,7 +195,7 @@ class RequestSendThread(val controllerId: Int,
             else {
               val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
                 time.milliseconds(), true)
-              clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
+              clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
               isSendSuccessful = true
             }
           } catch {
@@ -233,10 +232,9 @@ class RequestSendThread(val controllerId: Int,
   }
 
   private def brokerReady(): Boolean = {
-    import NetworkClientBlockingOps._
     try {
-      if (!networkClient.isReady(brokerNode)(time)) {
-        if (!networkClient.blockingReady(brokerNode, socketTimeoutMs)(time))
+      if (!NetworkClientUtils.isReady(networkClient, brokerNode, time.milliseconds())) {
+        if (!NetworkClientUtils.awaitReady(networkClient, brokerNode, time, socketTimeoutMs))
           throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
 
         info("Controller %d connected to %s for sending state change requests".format(controllerId, brokerNode.toString))

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
index 9d62924..2bc0c21 100644
--- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala
@@ -69,7 +69,7 @@ class GroupMetadataManager(val brokerId: Int,
   private val shuttingDown = new AtomicBoolean(false)
 
   /* number of partitions for the consumer metadata topic */
-  private val groupMetadataTopicPartitionCount = getOffsetsTopicPartitionCount
+  private val groupMetadataTopicPartitionCount = getGroupMetadataTopicPartitionCount
 
   /* single-thread scheduler to handle offset/group metadata cache loading and unloading */
   private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "group-metadata-manager-")
@@ -667,16 +667,11 @@ class GroupMetadataManager(val brokerId: Int,
   }
 
   /**
-   * Gets the partition count of the offsets topic from ZooKeeper.
+   * Gets the partition count of the group metadata topic from ZooKeeper.
    * If the topic does not exist, the configured partition count is returned.
    */
-  private def getOffsetsTopicPartitionCount = {
-    val topic = Topic.GroupMetadataTopicName
-    val topicData = zkUtils.getPartitionAssignmentForTopics(Seq(topic))
-    if (topicData(topic).nonEmpty)
-      topicData(topic).size
-    else
-      config.offsetsTopicNumPartitions
+  private def getGroupMetadataTopicPartitionCount: Int = {
+    zkUtils.getTopicPartitionCount(Topic.GroupMetadataTopicName).getOrElse(config.offsetsTopicNumPartitions)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/PidMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/PidMetadata.scala b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
new file mode 100644
index 0000000..fa58add
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/PidMetadata.scala
@@ -0,0 +1,31 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import kafka.utils.nonthreadsafe
+
+@nonthreadsafe
+private[coordinator] class PidMetadata(val pid: Long) {
+
+  /* current epoch number of the PID */
+  var epoch: Short = 0
+
+  override def equals(that: Any): Boolean = that match {
+    case other: PidMetadata => pid == other.pid && epoch == other.epoch
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
new file mode 100644
index 0000000..43b05a4
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/ProducerIdManager.scala
@@ -0,0 +1,153 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import kafka.common.KafkaException
+import kafka.utils.{Json, Logging, ZkUtils}
+
+/**
+  * Pid manager is part of the transaction coordinator that provides PIDs in a unique way such that the same PID will not be
+  * assigned twice across multiple transaction coordinators.
+  *
+  * Pids are managed via ZooKeeper, where the latest pid block is written on the corresponding ZK path by the manager who
+  * claims the block, where the written block_start_pid and block_end_pid are both inclusive.
+  */
+object ProducerIdManager extends Logging {
+  val CurrentVersion: Long = 1L
+  val PidBlockSize: Long = 1000L
+
+  def generatePidBlockJson(pidBlock: ProducerIdBlock): String = {
+    Json.encode(Map("version" -> CurrentVersion,
+      "broker" -> pidBlock.brokerId,
+      "block_start" -> pidBlock.blockStartPid.toString,
+      "block_end" -> pidBlock.blockEndPid.toString)
+    )
+  }
+
+  def parsePidBlockData(jsonData: String): ProducerIdBlock = {
+    try {
+      Json.parseFull(jsonData).flatMap { m =>
+        val pidBlockInfo = m.asInstanceOf[Map[String, Any]]
+        val brokerId = pidBlockInfo("broker").asInstanceOf[Int]
+        val blockStartPID = pidBlockInfo("block_start").asInstanceOf[String].toLong
+        val blockEndPID = pidBlockInfo("block_end").asInstanceOf[String].toLong
+        Some(ProducerIdBlock(brokerId, blockStartPID, blockEndPID))
+      }.getOrElse(throw new KafkaException(s"Failed to parse the pid block json $jsonData"))
+    } catch {
+      case e: java.lang.NumberFormatException =>
+        // this should never happen: the written data has exceeded long type limit
+        fatal(s"Read jason data $jsonData contains pids that have exceeded long type limit")
+        throw e
+    }
+  }
+}
+
+case class ProducerIdBlock(brokerId: Int, blockStartPid: Long, blockEndPid: Long) {
+  override def toString: String = {
+    val pidBlockInfo = new StringBuilder
+    pidBlockInfo.append("(brokerId:" + brokerId)
+    pidBlockInfo.append(",blockStartPID:" + blockStartPid)
+    pidBlockInfo.append(",blockEndPID:" + blockEndPid + ")")
+    pidBlockInfo.toString()
+  }
+}
+
+class ProducerIdManager(val brokerId: Int, val zkUtils: ZkUtils) extends Logging {
+
+  this.logIdent = "[ProducerId Manager " + brokerId + "]: "
+
+  private var currentPIDBlock: ProducerIdBlock = null
+  private var nextPID: Long = -1L
+
+  // grab the first block of PIDs
+  this synchronized {
+    getNewPidBlock()
+    nextPID = currentPIDBlock.blockStartPid
+  }
+
+  private def getNewPidBlock(): Unit = {
+    var zkWriteComplete = false
+    while (!zkWriteComplete) {
+      // refresh current pid block from zookeeper again
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+
+      // generate the new pid block
+      currentPIDBlock = dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          debug(s"Read current pid block $currPIDBlock, Zk path version $zkVersion")
+
+          if (currPIDBlock.blockEndPid > Long.MaxValue - ProducerIdManager.PidBlockSize) {
+            // we have exhausted all pids (wow!), treat it as a fatal error
+            fatal(s"Exhausted all pids as the next block's end pid is will has exceeded long type limit (current block end pid is ${currPIDBlock.blockEndPid})")
+            throw new KafkaException("Have exhausted all pids.")
+          }
+
+          ProducerIdBlock(brokerId, currPIDBlock.blockEndPid + 1L, currPIDBlock.blockEndPid + ProducerIdManager.PidBlockSize)
+        case None =>
+          debug(s"There is no pid block yet (Zk path version $zkVersion), creating the first block")
+          ProducerIdBlock(brokerId, 0L, ProducerIdManager.PidBlockSize - 1)
+      }
+
+      val newPIDBlockData = ProducerIdManager.generatePidBlockJson(currentPIDBlock)
+
+      // try to write the new pid block into zookeeper
+      val (succeeded, version) = zkUtils.conditionalUpdatePersistentPath(ZkUtils.PidBlockPath, newPIDBlockData, zkVersion, Some(checkPidBlockZkData))
+      zkWriteComplete = succeeded
+
+      if (zkWriteComplete)
+        info(s"Acquired new pid block $currentPIDBlock by writing to Zk with path version $version")
+    }
+  }
+
+  private def checkPidBlockZkData(zkUtils: ZkUtils, path: String, expectedData: String): (Boolean, Int) = {
+    try {
+      val expectedPidBlock = ProducerIdManager.parsePidBlockData(expectedData)
+      val (dataOpt, zkVersion) = zkUtils.readDataAndVersionMaybeNull(ZkUtils.PidBlockPath)
+      dataOpt match {
+        case Some(data) =>
+          val currPIDBlock = ProducerIdManager.parsePidBlockData(data)
+          (currPIDBlock.equals(expectedPidBlock), zkVersion)
+        case None =>
+          (false, -1)
+      }
+    } catch {
+      case e: Exception =>
+        warn(s"Error while checking for pid block Zk data on path $path: expected data $expectedData", e)
+        
+        (false, -1)
+    }
+  }
+
+  def nextPid(): Long = {
+    this synchronized {
+      // grab a new block of PIDs if this block has been exhausted
+      if (nextPID > currentPIDBlock.blockEndPid) {
+        getNewPidBlock()
+        nextPID = currentPIDBlock.blockStartPid + 1
+      } else {
+        nextPID += 1
+      }
+
+      nextPID - 1
+    }
+  }
+
+  def shutdown() {
+    info(s"Shutdown complete: last PID assigned $nextPID")
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
new file mode 100644
index 0000000..41b4323
--- /dev/null
+++ b/core/src/main/scala/kafka/coordinator/TransactionCoordinator.scala
@@ -0,0 +1,92 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *    http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.coordinator
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import kafka.server.KafkaConfig
+import kafka.utils.{Logging, ZkUtils}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.utils.Time
+
+/**
+  * Transaction coordinator handles message transactions sent by producers and communicate with brokers
+  * to update ongoing transaction's status.
+  *
+  * Each Kafka server instantiates a transaction coordinator which is responsible for a set of
+  * producers. Producers with specific transactional ids are assigned to their corresponding coordinators;
+  * Producers with no specific transactional id may talk to a random broker as their coordinators.
+  */
+object TransactionCoordinator {
+
+  def apply(config: KafkaConfig, zkUtils: ZkUtils, time: Time): TransactionCoordinator = {
+    val pidManager = new ProducerIdManager(config.brokerId, zkUtils)
+    new TransactionCoordinator(config.brokerId, pidManager)
+  }
+}
+
+class TransactionCoordinator(val brokerId: Int,
+                             val pidManager: ProducerIdManager) extends Logging {
+
+  this.logIdent = "[Transaction Coordinator " + brokerId + "]: "
+
+  type InitPidCallback = InitPidResult => Unit
+
+  /* Active flag of the coordinator */
+  private val isActive = new AtomicBoolean(false)
+
+  def handleInitPid(transactionalId: String,
+                    responseCallback: InitPidCallback): Unit = {
+    if (transactionalId == null || transactionalId.isEmpty) {
+      // if the transactional id is not specified, then always blindly accept the request
+      // and return a new pid from the pid manager
+      val pid = pidManager.nextPid()
+      responseCallback(InitPidResult(pid, epoch = 0, Errors.NONE))
+    } else {
+      // check if it is the assigned coordinator for the transactional id
+      responseCallback(initPidError(Errors.NOT_COORDINATOR_FOR_GROUP))
+    }
+  }
+
+  /**
+    * Startup logic executed at the same time when the server starts up.
+    */
+  def startup() {
+    info("Starting up.")
+    isActive.set(true)
+    info("Startup complete.")
+  }
+
+  /**
+    * Shutdown logic executed at the same time when server shuts down.
+    * Ordering of actions should be reversed from the startup process.
+    */
+  def shutdown() {
+    info("Shutting down.")
+    isActive.set(false)
+    pidManager.shutdown()
+    info("Shutdown complete.")
+  }
+
+  private def initPidError(error: Errors): InitPidResult = {
+    InitPidResult(pid = RecordBatch.NO_PRODUCER_ID, epoch = RecordBatch.NO_PRODUCER_EPOCH, error)
+  }
+
+}
+
+case class InitPidResult(pid: Long, epoch: Short, error: Errors)

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 2a81f26..95a6896 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,30 +17,30 @@
 
 package kafka.log
 
+import java.io.{File, IOException}
+import java.text.NumberFormat
+import java.util.concurrent.atomic._
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap, TimeUnit}
+
+import com.yammer.metrics.core.Gauge
 import kafka.api.KAFKA_0_10_0_IV0
-import kafka.utils._
 import kafka.common._
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.{BrokerTopicStats, FetchDataInfo, LogOffsetMetadata}
-import java.io.{File, IOException}
-import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
-import java.util.concurrent.atomic._
-import java.text.NumberFormat
-
+import kafka.utils._
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException, UnsupportedForMessageFormatException}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.ListOffsetRequest
+import org.apache.kafka.common.utils.{Time, Utils}
 
-import scala.collection.Seq
 import scala.collection.JavaConverters._
-import com.yammer.metrics.core.Gauge
-import org.apache.kafka.common.utils.{Time, Utils}
-import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
-import org.apache.kafka.common.TopicPartition
+import scala.collection.{Seq, mutable}
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
-    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+    NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false, Map.empty[Long, ProducerAppendInfo], false)
 }
 
 /**
@@ -56,6 +56,9 @@ object LogAppendInfo {
  * @param shallowCount The number of shallow messages
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically increasing
+ * @param producerAppendInfos A map from a Pid to a ProducerAppendInfo, which is used to validate each Record in a
+ *                            RecordBatch and keep track of metadata across Records in a RecordBatch.
+ * @param isDuplicate Indicates whether the message set is a duplicate of a message at the tail of the log.
  */
 case class LogAppendInfo(var firstOffset: Long,
                          var lastOffset: Long,
@@ -66,8 +69,9 @@ case class LogAppendInfo(var firstOffset: Long,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
                          validBytes: Int,
-                         offsetsMonotonic: Boolean)
-
+                         offsetsMonotonic: Boolean,
+                         producerAppendInfos: Map[Long, ProducerAppendInfo],
+                         isDuplicate: Boolean = false)
 
 /**
  * An append-only log for storing messages.
@@ -93,7 +97,8 @@ case class LogAppendInfo(var firstOffset: Long,
  * @param recoveryPoint The offset at which to begin recovery--i.e. the first offset which has not been flushed to disk
  * @param scheduler The thread pool scheduler used for background actions
  * @param time The time instance used for checking the clock
- *
+ * @param maxPidExpirationMs The maximum amount of time to wait before a PID is considered expired
+ * @param pidExpirationCheckIntervalMs How often to check for PIDs which need to be expired
  */
 @threadsafe
 class Log(@volatile var dir: File,
@@ -101,7 +106,10 @@ class Log(@volatile var dir: File,
           @volatile var logStartOffset: Long = 0L,
           @volatile var recoveryPoint: Long = 0L,
           scheduler: Scheduler,
-          time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
+          time: Time = Time.SYSTEM,
+          val maxPidExpirationMs: Int = 60 * 60 * 1000,
+          val pidExpirationCheckIntervalMs: Int = 10 * 60 * 1000,
+          val pidSnapshotCreationIntervalMs: Int = 60 * 1000) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.Log._
 
@@ -118,10 +126,16 @@ class Log(@volatile var dir: File,
       0
   }
 
+  val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
+
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
+  /* Construct and load PID map */
+  private val pidMap = new ProducerIdMapping(config, topicPartition, dir, maxPidExpirationMs)
+
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+
   locally {
     val startMs = time.milliseconds
 
@@ -131,13 +145,12 @@ class Log(@volatile var dir: File,
       activeSegment.size.toInt)
 
     logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+    buildAndRecoverPidMap(logEndOffset)
 
     info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
       .format(name, segments.size(), logStartOffset, logEndOffset, time.milliseconds - startMs))
   }
 
-  val topicPartition: TopicPartition = Log.parseTopicPartitionName(dir)
-
   private val tags = Map("topic" -> topicPartition.topic, "partition" -> topicPartition.partition.toString)
 
   newGauge("NumLogSegments",
@@ -164,6 +177,19 @@ class Log(@volatile var dir: File,
     },
     tags)
 
+  scheduler.schedule(name = "PeriodicPidExpirationCheck", fun = () => {
+    lock synchronized {
+      pidMap.checkForExpiredPids(time.milliseconds)
+    }
+  }, period = pidExpirationCheckIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+  scheduler.schedule(name = "PeriodicPidSnapshotTask", fun = () => {
+    lock synchronized {
+      pidMap.maybeTakeSnapshot()
+    }
+  }, period = pidSnapshotCreationIntervalMs, unit = TimeUnit.MILLISECONDS)
+
+
   /** The name of this log */
   def name  = dir.getName()
 
@@ -332,6 +358,47 @@ class Log(@volatile var dir: File,
   }
 
   /**
+    * Creates an instance of id map for this log and updates the mapping
+    * in the case it is missing some messages. Note that the id mapping
+    * starts from a snapshot that is taken strictly before the log end
+    * offset. Consequently, we need to process the tail of the log to update
+    * the mapping.
+    *
+    * @param lastOffset
+    *
+    * @return An instance of ProducerIdMapping
+    */
+  private def buildAndRecoverPidMap(lastOffset: Long) {
+    lock synchronized {
+      val currentTimeMs = time.milliseconds
+      pidMap.truncateAndReload(lastOffset, currentTimeMs)
+      logSegments(pidMap.mapEndOffset, lastOffset).foreach { segment =>
+        val startOffset = math.max(segment.baseOffset, pidMap.mapEndOffset)
+        val fetchDataInfo = segment.read(startOffset, Some(lastOffset), Int.MaxValue)
+        val records = fetchDataInfo.records
+        records.batches.asScala.foreach { batch =>
+          if (batch.hasProducerId) {
+            // TODO: Currently accessing any of the batch-level headers other than the offset
+            // or magic causes us to load the full entry into memory. It would be better if we
+            // only loaded the header
+            val numRecords = (batch.lastOffset - batch.baseOffset + 1).toInt
+            val pidEntry = ProducerIdEntry(batch.producerEpoch, batch.lastSequence, batch.lastOffset,
+              numRecords, batch.maxTimestamp)
+            pidMap.load(batch.producerId, pidEntry, currentTimeMs)
+          }
+        }
+      }
+      pidMap.cleanFrom(logStartOffset)
+    }
+  }
+
+  private[log] def activePids: Map[Long, ProducerIdEntry] = {
+    lock synchronized {
+      pidMap.activePids
+    }
+  }
+
+  /**
    * Check if we have the "clean shutdown" file
    */
   private def hasCleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile).exists()
@@ -364,10 +431,11 @@ class Log(@volatile var dir: File,
    * @return Information about the appended messages including the first and last offset.
    */
   def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
-    val appendInfo = analyzeAndValidateRecords(records)
 
-    // if we have any valid messages, append them to the log
-    if (appendInfo.shallowCount == 0)
+    val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
+
+    // return if we have no valid messages or if this is a duplicate of the last appended entry
+    if (appendInfo.shallowCount == 0 || appendInfo.isDuplicate)
       return appendInfo
 
     // trim any invalid bytes or partial messages before appending it to the on-disk log
@@ -433,7 +501,6 @@ class Log(@volatile var dir: File,
           maxTimestampInMessages = appendInfo.maxTimestamp,
           maxOffsetInMessages = appendInfo.lastOffset)
 
-
         // now append to the log
         segment.append(firstOffset = appendInfo.firstOffset,
           largestOffset = appendInfo.lastOffset,
@@ -441,6 +508,15 @@ class Log(@volatile var dir: File,
           shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
           records = validRecords)
 
+        // update the PID sequence mapping
+        for ((pid, producerAppendInfo) <- appendInfo.producerAppendInfos) {
+          trace(s"Updating pid with sequence: $pid -> ${producerAppendInfo.lastEntry}")
+
+          if (assignOffsets)
+            producerAppendInfo.assignLastOffsetAndTimestamp(appendInfo.lastOffset, appendInfo.maxTimestamp)
+          pidMap.update(producerAppendInfo)
+        }
+
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
 
@@ -457,7 +533,7 @@ class Log(@volatile var dir: File,
     }
   }
 
-  /*
+  /**
    * Increment the log start offset if the provided offset is larger.
    */
   def maybeIncrementLogStartOffset(offset: Long) {
@@ -476,6 +552,7 @@ class Log(@volatile var dir: File,
    * <ol>
    * <li> each message matches its CRC
    * <li> each message size is valid
+   * <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other.
    * </ol>
    *
    * Also compute the following quantities:
@@ -488,7 +565,7 @@ class Log(@volatile var dir: File,
    * <li> Whether any compression codec is used (if many are used, then the last one is given)
    * </ol>
    */
-  private def analyzeAndValidateRecords(records: MemoryRecords): LogAppendInfo = {
+  private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = {
     var shallowMessageCount = 0
     var validBytesCount = 0
     var firstOffset = -1L
@@ -497,8 +574,12 @@ class Log(@volatile var dir: File,
     var monotonic = true
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
+    var isDuplicate = false
+    val producerAppendInfos = mutable.Map[Long, ProducerAppendInfo]()
 
     for (batch <- records.batches.asScala) {
+      if (isFromClient && batch.magic >= RecordBatch.MAGIC_VALUE_V2 && shallowMessageCount > 0)
+        throw new InvalidRecordException("Client produce requests should not have more than one batch")
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
       // For magic version 2, we can get the first offset directly from the batch header.
@@ -508,6 +589,7 @@ class Log(@volatile var dir: File,
       // check that offsets are monotonically increasing
       if (lastOffset >= batch.lastOffset)
         monotonic = false
+
       // update the last offset seen
       lastOffset = batch.lastOffset
 
@@ -527,19 +609,43 @@ class Log(@volatile var dir: File,
         maxTimestamp = batch.maxTimestamp
         offsetOfMaxTimestamp = lastOffset
       }
+
       shallowMessageCount += 1
       validBytesCount += batchSize
 
       val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id)
       if (messageCodec != NoCompressionCodec)
         sourceCodec = messageCodec
+
+      val pid = batch.producerId
+      if (pid != RecordBatch.NO_PRODUCER_ID) {
+        producerAppendInfos.get(pid) match {
+          case Some(appendInfo) => appendInfo.append(batch)
+          case None =>
+            val lastEntry = pidMap.lastEntry(pid).getOrElse(ProducerIdEntry.Empty)
+            if (isFromClient && lastEntry.isDuplicate(batch)) {
+              // This request is a duplicate so return the information about the existing entry. Note that for requests
+              // coming from the client, there will only be one RecordBatch per request, so there will be only one iteration
+              // of the loop and the values below will not be updated more than once.
+              isDuplicate = true
+              firstOffset = lastEntry.firstOffset
+              lastOffset = lastEntry.lastOffset
+              maxTimestamp = lastEntry.timestamp
+              info(s"Detected a duplicate at (firstOffset, lastOffset): (${firstOffset}, ${lastOffset}). Ignoring the incoming record.")
+            } else {
+              val producerAppendInfo = new ProducerAppendInfo(pid, lastEntry)
+              producerAppendInfos.put(pid, producerAppendInfo)
+              producerAppendInfo.append(batch)
+            }
+        }
+      }
     }
 
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
     LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, sourceCodec,
-      targetCodec, shallowMessageCount, validBytesCount, monotonic)
+      targetCodec, shallowMessageCount, validBytesCount, monotonic, producerAppendInfos.toMap, isDuplicate)
   }
 
   /**
@@ -941,6 +1047,7 @@ class Log(@volatile var dir: File,
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
         this.logStartOffset = math.min(targetOffset, this.logStartOffset)
       }
+      buildAndRecoverPidMap(targetOffset)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 8ddeca9..830f906 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -26,7 +26,7 @@ import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record}
+import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, RecordBatch}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
@@ -219,8 +219,7 @@ class LogCleaner(val config: CleanerConfig,
     override def doWork() {
       cleanOrSleep()
     }
-    
-    
+
     override def shutdown() = {
     	 initiateShutdown()
     	 backOffWaitLatch.countDown()
@@ -402,7 +401,7 @@ private[log] class Cleaner(val id: Int,
         val retainDeletes = old.lastModified > deleteHorizonMs
         info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
             .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
-        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, stats)
+        cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, log.activePids, stats)
       }
 
       // trim excess index
@@ -449,9 +448,10 @@ private[log] class Cleaner(val id: Int,
                              map: OffsetMap,
                              retainDeletes: Boolean,
                              maxLogMessageSize: Int,
+                             activePids: Map[Long, ProducerIdEntry],
                              stats: CleanerStats) {
     val logCleanerFilter = new RecordFilter {
-      def shouldRetain(record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats)
+      def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean = shouldRetainMessage(source, map, retainDeletes, record, stats, activePids, recordBatch.producerId)
     }
 
     var position = 0
@@ -493,10 +493,17 @@ private[log] class Cleaner(val id: Int,
                                   map: kafka.log.OffsetMap,
                                   retainDeletes: Boolean,
                                   record: Record,
-                                  stats: CleanerStats): Boolean = {
+                                  stats: CleanerStats,
+                                  activePids: Map[Long, ProducerIdEntry],
+                                  pid: Long): Boolean = {
     if (record.isControlRecord)
       return true
 
+    // retain the entry if it is the last one produced by an active idempotent producer to ensure that
+    // the PID is not removed from the log before it has been expired
+    if (RecordBatch.NO_PRODUCER_ID < pid && activePids.get(pid).exists(_.lastOffset == record.offset))
+      return true
+
     val pastLatestOffset = record.offset > map.latestOffset
     if (pastLatestOffset)
       return true

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 55669c0..30bc26b 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -56,6 +56,7 @@ object Defaults {
   val MessageTimestampDifferenceMaxMs = kafka.server.Defaults.LogMessageTimestampDifferenceMaxMs
   val LeaderReplicationThrottledReplicas = Collections.emptyList[String]()
   val FollowerReplicationThrottledReplicas = Collections.emptyList[String]()
+  val MaxIdMapSnapshots = kafka.server.Defaults.MaxIdMapSnapshots
 }
 
 case class LogConfig(props: java.util.Map[_, _]) extends AbstractConfig(LogConfig.configDef, props, false) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a555420..ec164e2 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -25,9 +25,10 @@ import kafka.utils._
 import scala.collection._
 import scala.collection.JavaConverters._
 import kafka.common.{KafkaException, KafkaStorageException}
-import kafka.server.{BrokerState, OffsetCheckpoint, RecoveringFromUncleanShutdown}
+import kafka.server._
 import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
 
+import kafka.admin.AdminUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
@@ -51,6 +52,7 @@ class LogManager(val logDirs: Array[File],
                  val flushRecoveryOffsetCheckpointMs: Long,
                  val flushStartOffsetCheckpointMs: Long,
                  val retentionCheckMs: Long,
+                 val maxPidExpirationMs: Int,
                  scheduler: Scheduler,
                  val brokerState: BrokerState,
                  time: Time) extends Logging {
@@ -166,7 +168,14 @@ class LogManager(val logDirs: Array[File],
           val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
           val logStartOffset = logStartOffsets.getOrElse(topicPartition, 0L)
 
-          val current = new Log(logDir, config, logStartOffset, logRecoveryPoint, scheduler, time)
+          val current = new Log(
+            dir = logDir,
+            config = config,
+            logStartOffset = logStartOffset,
+            recoveryPoint = logRecoveryPoint,
+            maxPidExpirationMs = maxPidExpirationMs,
+            scheduler = scheduler,
+            time = time)
           if (logDir.getName.endsWith(Log.DeleteDirSuffix)) {
             this.logsToBeDeleted.add(current)
           } else {
@@ -401,7 +410,15 @@ class LogManager(val logDirs: Array[File],
         val dataDir = nextLogDir()
         val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
         dir.mkdirs()
-        val log = new Log(dir, config, logStartOffset = 0L, recoveryPoint = 0L, scheduler, time)
+
+        val log = new Log(
+          dir = dir,
+          config = config,
+          logStartOffset = 0L,
+          recoveryPoint = 0L,
+          maxPidExpirationMs = maxPidExpirationMs,
+          scheduler = scheduler,
+          time = time)
         logs.put(topicPartition, log)
         info("Created log for partition [%s,%d] in %s with properties {%s}."
           .format(topicPartition.topic,
@@ -493,7 +510,7 @@ class LogManager(val logDirs: Array[File],
       // count the number of logs in each parent directory (including 0 for empty directories
       val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
       val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
-      var dirCounts = (zeros ++ logCounts).toBuffer
+      val dirCounts = (zeros ++ logCounts).toBuffer
     
       // choose the directory with the least logs in it
       val leastLoaded = dirCounts.sortBy(_._2).head
@@ -556,3 +573,42 @@ class LogManager(val logDirs: Array[File],
     }
   }
 }
+
+object LogManager {
+  def apply(config: KafkaConfig,
+            zkUtils: ZkUtils,
+            brokerState: BrokerState,
+            kafkaScheduler: KafkaScheduler,
+            time: Time): LogManager = {
+    val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
+    val defaultLogConfig = LogConfig(defaultProps)
+
+    val topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils).map { case (topic, configs) =>
+      topic -> LogConfig.fromProps(defaultProps, configs)
+    }
+
+    // read the log configurations from zookeeper
+    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
+      dedupeBufferSize = config.logCleanerDedupeBufferSize,
+      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+      ioBufferSize = config.logCleanerIoBufferSize,
+      maxMessageSize = config.messageMaxBytes,
+      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+      backOffMs = config.logCleanerBackoffMs,
+      enableCleaner = config.logCleanerEnable)
+
+    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
+      topicConfigs = topicConfigs,
+      defaultConfig = defaultLogConfig,
+      cleanerConfig = cleanerConfig,
+      ioThreads = config.numRecoveryThreadsPerDataDir,
+      flushCheckMs = config.logFlushSchedulerIntervalMs,
+      flushRecoveryOffsetCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
+      flushStartOffsetCheckpointMs = config.logFlushStartOffsetCheckpointIntervalMs,
+      retentionCheckMs = config.logCleanupIntervalMs,
+      maxPidExpirationMs = config.transactionIdExpirationMs,
+      scheduler = kafkaScheduler,
+      brokerState = brokerState,
+      time = time)
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/bdf4cba0/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 94e3608..c01a5de 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -62,6 +62,7 @@ private[kafka] object LogValidator extends Logging {
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
           messageTimestampDiffMaxMs)
     } else {
+
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
         messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
     }
@@ -214,8 +215,15 @@ private[kafka] object LogValidator extends Logging {
       }
 
       if (!inPlaceAssignment) {
+        val (pid, epoch, sequence) = {
+          // note that we only reassign offsets for requests coming straight from a producer. For records with MagicV2,
+          // there should be exactly one RecordBatch per request, so the following is all we need to do. For Records
+          // with older magic versions, this will always be NO_PRODUCER_ID, etc.
+          val first = records.batches.asScala.head
+          (first.producerId, first.producerEpoch, first.baseSequence)
+        }
         buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
-          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords)
+          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -238,10 +246,12 @@ private[kafka] object LogValidator extends Logging {
 
   private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
                                            compressionType: CompressionType, logAppendTime: Long,
-                                           validatedRecords: Seq[Record]): ValidationAndOffsetAssignResult = {
+                                           validatedRecords: Seq[Record],
+                                           producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = {
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
-    val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value, logAppendTime)
+    val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
+      logAppendTime, producerId, epoch, baseSequence)
 
     validatedRecords.foreach { record =>
       builder.appendWithOffset(offsetCounter.getAndIncrement(), record)