You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/06 21:51:31 UTC

[3/3] kafka git commit: KIP-101: Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation

KIP-101: Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation

This PR replaces https://github.com/apache/kafka/pull/2743 (just raising from Confluent repo)

This PR describes the addition of Partition Level Leader Epochs to messages in Kafka as a mechanism for fixing some known issues in the replication protocol. Full details can be found here:

[KIP-101 Reference](https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation)

*The key elements are*:
- Epochs are stamped on messages as they enter the leader.
- Epochs are tracked in both leader and follower in a new checkpoint file.
- A new API allows followers to retrieve the leader's latest offset for a particular epoch.
- The logic for truncating the log, when a replica becomes a follower, has been moved from Partition into the ReplicaFetcherThread
- When partitions are added to the ReplicaFetcherThread they are added in an initialising state. Initialising partitions request leader epochs and then truncate their logs appropriately.

This test provides a good overview of the workflow `EpochDrivenReplicationProtocolAcceptanceTest.shouldFollowLeaderEpochBasicWorkflow()`

The corrupted log use case is covered by the test
`EpochDrivenReplicationProtocolAcceptanceTest.offsetsShouldNotGoBackwards()`

Remaining work: There is a do list here: https://docs.google.com/document/d/1edmMo70MfHEZH9x38OQfTWsHr7UGTvg-NOxeFhOeRew/edit?usp=sharing

Author: Ben Stopford <be...@gmail.com>
Author: Jun Rao <ju...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #2808 from benstopford/kip-101-v2


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0baea2ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0baea2ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0baea2ac

Branch: refs/heads/trunk
Commit: 0baea2ac13532981f3fea11e5dfc6da5aafaeaa8
Parents: b611cfa
Author: Ben Stopford <be...@gmail.com>
Authored: Thu Apr 6 14:51:09 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Apr 6 14:51:09 2017 -0700

----------------------------------------------------------------------
 checkstyle/suppressions.xml                     |   2 +
 .../apache/kafka/common/protocol/ApiKeys.java   |   3 +-
 .../apache/kafka/common/protocol/Protocol.java  |  52 ++
 .../kafka/common/record/MemoryRecords.java      |  29 +-
 .../kafka/common/requests/AbstractRequest.java  |   3 +
 .../kafka/common/requests/AbstractResponse.java |   2 +
 .../kafka/common/requests/EpochEndOffset.java   |  81 +++
 .../requests/OffsetsForLeaderEpochRequest.java  | 175 +++++
 .../requests/OffsetsForLeaderEpochResponse.java | 132 ++++
 .../common/requests/RequestResponseTest.java    |  23 +
 .../org/apache/kafka/test/MockDeserializer.java |  25 +-
 config/log4j.properties                         |   2 +-
 core/src/main/scala/kafka/api/ApiVersion.scala  |  15 +-
 .../main/scala/kafka/cluster/Partition.scala    |  11 +-
 core/src/main/scala/kafka/cluster/Replica.scala |   6 +-
 .../kafka/consumer/ConsumerFetcherThread.scala  |  12 +-
 core/src/main/scala/kafka/log/Log.scala         |  62 +-
 .../scala/kafka/log/LogCleanerManager.scala     |   4 +-
 core/src/main/scala/kafka/log/LogManager.scala  |  21 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |  11 +-
 .../src/main/scala/kafka/log/LogValidator.scala |  32 +-
 .../kafka/server/AbstractFetcherThread.scala    |  95 ++-
 .../src/main/scala/kafka/server/KafkaApis.scala |  12 +
 .../scala/kafka/server/OffsetCheckpoint.scala   | 124 ----
 .../server/ReplicaFetcherBlockingSend.scala     | 105 +++
 .../kafka/server/ReplicaFetcherThread.scala     | 198 ++---
 .../scala/kafka/server/ReplicaManager.scala     |  41 +-
 .../server/checkpoints/CheckpointFile.scala     | 114 +++
 .../checkpoints/LeaderEpochCheckpointFile.scala |  67 ++
 .../checkpoints/OffsetCheckpointFile.scala      |  60 ++
 .../server/epoch/LeaderEpochFileCache.scala     | 224 ++++++
 .../kafka/api/AuthorizerIntegrationTest.scala   |  16 +-
 .../kafka/api/EndToEndClusterIdTest.scala       |  11 +-
 .../kafka/api/ProducerCompressionTest.scala     |   5 +-
 .../kafka/log/LogCleanerIntegrationTest.scala   |   7 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  12 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   |  25 +
 .../src/test/scala/unit/kafka/log/LogTest.scala | 225 +++++-
 .../server/AbstractFetcherThreadTest.scala      |  15 +-
 .../unit/kafka/server/ISRExpirationTest.scala   |   5 +
 .../unit/kafka/server/LogRecoveryTest.scala     |   5 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala | 402 +++++++++++
 .../kafka/server/ReplicaManagerQuotasTest.scala |   2 +-
 .../kafka/server/ReplicationQuotasTest.scala    |   2 +-
 .../unit/kafka/server/SimpleFetchTest.scala     |   3 +-
 .../LeaderEpochCheckpointFileTest.scala         |  72 ++
 .../checkpoints/OffsetCheckpointFileTest.scala  |  89 +++
 ...rivenReplicationProtocolAcceptanceTest.scala | 410 +++++++++++
 .../server/epoch/LeaderEpochFileCacheTest.scala | 721 +++++++++++++++++++
 .../epoch/LeaderEpochIntegrationTest.scala      | 283 ++++++++
 .../epoch/OffsetsForLeaderEpochTest.scala       |  98 +++
 .../util/ReplicaFetcherMockBlockingSend.scala   |  80 ++
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 .../core/zookeeper_security_upgrade_test.py     |   1 -
 54 files changed, 3908 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7bc55c8..39d61e2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -232,5 +232,7 @@
 
     <suppress checks="NPathComplexity"
               files="KafkaLog4jAppender.java"/>
+    <suppress checks="JavaNCSS"
+              files="RequestResponseTest.java"/>
 
 </suppressions>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 4183638..b65defb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -47,7 +47,8 @@ public enum ApiKeys {
     CREATE_TOPICS(19, "CreateTopics"),
     DELETE_TOPICS(20, "DeleteTopics"),
     DELETE_RECORDS(21, "DeleteRecords"),
-    INIT_PRODUCER_ID(22, "InitProducerId");
+    INIT_PRODUCER_ID(22, "InitProducerId"),
+    OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch");
 
     private static final ApiKeys[] ID_TO_TYPE;
     private static final int MIN_API_KEY = 0;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 37eb75c..cc228c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1199,6 +1199,56 @@ public class Protocol {
 
     public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
 
+    /* Offsets for Leader Epoch api */
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
+            new Field("partition_id",
+                    INT32,
+                    "The partition_id"),
+            new Field("leader_epoch",
+                    INT32,
+                    "The epoch")
+    );
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema(
+            new Field("topic",
+                    STRING,
+                    "The topic"),
+            new Field("partitions",
+                    new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0),
+                    "The partition")
+            );
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
+            new Field("topics",
+                    new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0),
+                    "An array of topics to get epochs for"));
+
+
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0 = new Schema(
+            new Field("error_code",
+                    INT16,
+                    "The error code"),
+            new Field("partition_id",
+                    INT32,
+                    "The partition id"),
+            new Field("end_offset",
+                    INT64,
+                    "The end offset")
+    );
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0 = new Schema(
+            new Field("topic",
+                     STRING,
+                    "The topic"),
+            new Field("partitions",
+                    new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0),
+                    "The partition")
+            );
+    public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema(
+            new Field("topics",
+                    new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
+                    "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
+
+    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+    public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+
     /* an array of all requests and responses with all schema versions; a null value in the inner array means that the
      * particular version is not supported */
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1232,6 +1282,7 @@ public class Protocol {
         REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
         REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
         REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
+        REQUESTS[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_REQUEST;
 
         RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
         RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1256,6 +1307,7 @@ public class Protocol {
         RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
         RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
         RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
+        RESPONSES[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_RESPONSE;
 
         /* set the minimum and maximum version of each api */
         for (ApiKeys api : ApiKeys.values()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f810e39..b3beed5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -303,7 +303,18 @@ public class MemoryRecords extends AbstractRecords {
                                                long baseOffset,
                                                long logAppendTime) {
         return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
-                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE);
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset,
+                                               long logAppendTime,
+                                               int partitionLeaderEpoch) {
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
+                RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch);
     }
 
     public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -315,8 +326,22 @@ public class MemoryRecords extends AbstractRecords {
                                                long pid,
                                                short epoch,
                                                int baseSequence) {
+        return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
+                pid, epoch, baseSequence, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+                                               byte magic,
+                                               CompressionType compressionType,
+                                               TimestampType timestampType,
+                                               long baseOffset,
+                                               long logAppendTime,
+                                               long pid,
+                                               short epoch,
+                                               int baseSequence,
+                                               int partitionLeaderEpoch) {
         return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
-                logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+                logAppendTime, pid, epoch, baseSequence, false, partitionLeaderEpoch,
                 buffer.remaining());
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 1638556..7ce3518 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -174,6 +174,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
             case INIT_PRODUCER_ID:
                 request = new InitPidRequest(struct, version);
                 break;
+            case OFFSET_FOR_LEADER_EPOCH:
+                request = new OffsetsForLeaderEpochRequest(struct, version);
+                break;
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 314aa42..1ae30d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -95,6 +95,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
                 return new DeleteRecordsResponse(struct);
             case INIT_PRODUCER_ID:
                 return new InitPidResponse(struct);
+            case OFFSET_FOR_LEADER_EPOCH:
+                return new OffsetsForLeaderEpochResponse(struct);
             default:
                 throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
                         "code should be updated to do so.", apiKey));

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
new file mode 100644
index 0000000..2d49149
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.Errors;
+
+import static org.apache.kafka.common.record.RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+
+/**
+ * The offset, fetched from a leader, for a particular partition.
+ */
+
+public class EpochEndOffset {
+    public static final long UNDEFINED_EPOCH_OFFSET = UNKNOWN_PARTITION_LEADER_EPOCH;
+    public static final int UNDEFINED_EPOCH = -1;
+
+    private Errors error;
+    private long endOffset;
+
+    public EpochEndOffset(Errors error, long endOffset) {
+        this.error = error;
+        this.endOffset = endOffset;
+    }
+
+    public EpochEndOffset(long endOffset) {
+        this.error = Errors.NONE;
+        this.endOffset = endOffset;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    public boolean hasError() {
+        return error != Errors.NONE;
+    }
+
+    public long endOffset() {
+        return endOffset;
+    }
+
+    @Override
+    public String toString() {
+        return "EpochEndOffset{" +
+                "error=" + error +
+                ", endOffset=" + endOffset +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        EpochEndOffset that = (EpochEndOffset) o;
+
+        if (error != that.error) return false;
+        return endOffset == that.endOffset;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) error.code();
+        result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
new file mode 100644
index 0000000..fed8910
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetsForLeaderEpochRequest extends AbstractRequest {
+    public static final String TOPICS = "topics";
+    public static final String TOPIC = "topic";
+    public static final String PARTITIONS = "partitions";
+    public static final String PARTITION_ID = "partition_id";
+    public static final String LEADER_EPOCH = "leader_epoch";
+
+    private Map<TopicPartition, Integer> epochsByPartition;
+
+    public Map<TopicPartition, Integer> epochsByTopicPartition() {
+        return epochsByPartition;
+    }
+
+    public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
+        private Map<TopicPartition, Integer> epochsByPartition = new HashMap();
+
+        public Builder() {
+            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+        }
+
+        public Builder(Map<TopicPartition, Integer> epochsByPartition) {
+            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+            this.epochsByPartition = epochsByPartition;
+        }
+
+        public Builder add(TopicPartition topicPartition, Integer epoch) {
+            epochsByPartition.put(topicPartition, epoch);
+            return this;
+        }
+
+        @Override
+        public OffsetsForLeaderEpochRequest build(short version) {
+            return new OffsetsForLeaderEpochRequest(epochsByPartition, version);
+        }
+
+        public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short version) {
+            return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(version, buffer), version);
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder bld = new StringBuilder();
+            bld.append("(type=OffsetForLeaderEpochRequest, ").
+                    append("epochsByTopic=").append(epochsByPartition).
+                    append(")");
+            return bld.toString();
+        }
+    }
+
+    public OffsetsForLeaderEpochRequest(Map<TopicPartition, Integer> epochsByPartition, short version) {
+        super(version);
+        this.epochsByPartition = epochsByPartition;
+    }
+
+    public OffsetsForLeaderEpochRequest(Struct struct, short version) {
+        super(version);
+        epochsByPartition = new HashMap<>();
+        for (Object t : struct.getArray(TOPICS)) {
+            Struct topicAndEpochs = (Struct) t;
+            String topic = topicAndEpochs.getString(TOPIC);
+            for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
+                Struct partitionAndEpoch = (Struct) e;
+                int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+                int epoch = partitionAndEpoch.getInt(LEADER_EPOCH);
+                TopicPartition tp = new TopicPartition(topic, partitionId);
+                epochsByPartition.put(tp, epoch);
+            }
+        }
+    }
+
+    public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short versionId) {
+        return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(versionId, buffer), versionId);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+
+        //Group by topic
+        Map<String, List<PartitionLeaderEpoch>> topicsToPartitionEpochs = new HashMap<>();
+        for (TopicPartition tp : epochsByPartition.keySet()) {
+            List<PartitionLeaderEpoch> partitionEndOffsets = topicsToPartitionEpochs.get(tp.topic());
+            if (partitionEndOffsets == null)
+                partitionEndOffsets = new ArrayList<>();
+            partitionEndOffsets.add(new PartitionLeaderEpoch(tp.partition(), epochsByPartition.get(tp)));
+            topicsToPartitionEpochs.put(tp.topic(), partitionEndOffsets);
+        }
+
+        List<Struct> topics = new ArrayList<>();
+        for (Map.Entry<String, List<PartitionLeaderEpoch>> topicEpochs : topicsToPartitionEpochs.entrySet()) {
+            Struct partition = struct.instance(TOPICS);
+            String topic = topicEpochs.getKey();
+            partition.set(TOPIC, topic);
+            List<PartitionLeaderEpoch> partitionLeaderEpoches = topicEpochs.getValue();
+            List<Struct> partitions = new ArrayList<>(partitionLeaderEpoches.size());
+            for (PartitionLeaderEpoch partitionLeaderEpoch : partitionLeaderEpoches) {
+                Struct partitionRow = partition.instance(PARTITIONS);
+                partitionRow.set(PARTITION_ID, partitionLeaderEpoch.partitionId);
+                partitionRow.set(LEADER_EPOCH, partitionLeaderEpoch.epoch);
+                partitions.add(partitionRow);
+            }
+            partition.set(PARTITIONS, partitions.toArray());
+            topics.add(partition);
+        }
+        struct.set(TOPICS, topics.toArray());
+        return struct;
+    }
+
+    @Override
+    public AbstractResponse getErrorResponse(Throwable e) {
+        Errors error = Errors.forException(e);
+        Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
+        for (TopicPartition tp : epochsByPartition.keySet()) {
+            errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
+        }
+        return new OffsetsForLeaderEpochResponse(errorResponse);
+    }
+
+    private class PartitionLeaderEpoch {
+        int partitionId;
+        int epoch;
+
+        public PartitionLeaderEpoch(int partitionId, int epoch) {
+            this.partitionId = partitionId;
+            this.epoch = epoch;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            PartitionLeaderEpoch other = (PartitionLeaderEpoch) o;
+
+            if (partitionId != other.partitionId) return false;
+            return epoch == other.epoch;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = partitionId;
+            result = 31 * result + epoch;
+            return result;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
new file mode 100644
index 0000000..03f3069
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetsForLeaderEpochResponse extends AbstractResponse {
+    public static final String TOPICS = "topics";
+    public static final String TOPIC = "topic";
+    public static final String PARTITIONS = "partitions";
+    public static final String ERROR_CODE = "error_code";
+    public static final String PARTITION_ID = "partition_id";
+    public static final String END_OFFSET = "end_offset";
+
+    private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
+
+    public OffsetsForLeaderEpochResponse(Struct struct) {
+        epochEndOffsetsByPartition = new HashMap<>();
+        for (Object t : struct.getArray(TOPICS)) {
+            Struct topicAndEpochs = (Struct) t;
+            String topic = topicAndEpochs.getString(TOPIC);
+            for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
+                Struct partitionAndEpoch = (Struct) e;
+                Errors error = Errors.forCode(partitionAndEpoch.getShort(ERROR_CODE));
+                int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+                TopicPartition tp = new TopicPartition(topic, partitionId);
+                long endOffset = partitionAndEpoch.getLong(END_OFFSET);
+                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset));
+            }
+        }
+    }
+
+    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic) {
+        this.epochEndOffsetsByPartition = epochsByTopic;
+    }
+
+    public Map<TopicPartition, EpochEndOffset> responses() {
+        return epochEndOffsetsByPartition;
+    }
+
+    public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
+        return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+
+        //Group by topic
+        Map<String, List<PartitionEndOffset>> topicsToPartitionEndOffsets = new HashMap<>();
+        for (TopicPartition tp : epochEndOffsetsByPartition.keySet()) {
+            List<PartitionEndOffset> partitionEndOffsets = topicsToPartitionEndOffsets.get(tp.topic());
+            if (partitionEndOffsets == null)
+                partitionEndOffsets = new ArrayList<>();
+            partitionEndOffsets.add(new PartitionEndOffset(tp.partition(), epochEndOffsetsByPartition.get(tp)));
+            topicsToPartitionEndOffsets.put(tp.topic(), partitionEndOffsets);
+        }
+
+        //Write struct
+        List<Struct> topics = new ArrayList<>(topicsToPartitionEndOffsets.size());
+        for (Map.Entry<String, List<PartitionEndOffset>> topicEpochs : topicsToPartitionEndOffsets.entrySet()) {
+            Struct partition = struct.instance(TOPICS);
+            String topic = topicEpochs.getKey();
+            partition.set(TOPIC, topic);
+            List<PartitionEndOffset> paritionEpochs = topicEpochs.getValue();
+            List<Struct> paritions = new ArrayList<>(paritionEpochs.size());
+            for (PartitionEndOffset peo : paritionEpochs) {
+                Struct partitionRow = partition.instance(PARTITIONS);
+                partitionRow.set(ERROR_CODE, peo.epochEndOffset.error().code());
+                partitionRow.set(PARTITION_ID, peo.partition);
+                partitionRow.set(END_OFFSET, peo.epochEndOffset.endOffset());
+                paritions.add(partitionRow);
+            }
+
+            partition.set(PARTITIONS, paritions.toArray());
+            topics.add(partition);
+        }
+        struct.set(TOPICS, topics.toArray());
+        return struct;
+    }
+
+    private class PartitionEndOffset {
+        private int partition;
+        private EpochEndOffset epochEndOffset;
+
+        PartitionEndOffset(int partition, EpochEndOffset epochEndOffset) {
+            this.partition = partition;
+            this.epochEndOffset = epochEndOffset;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            PartitionEndOffset that = (PartitionEndOffset) o;
+
+            if (partition != that.partition) return false;
+            return epochEndOffset != null ? epochEndOffset.equals(that.epochEndOffset) : that.epochEndOffset == null;
+        }
+
+        @Override
+        public int hashCode() {
+            int result = partition;
+            result = 31 * result + (epochEndOffset != null ? epochEndOffset.hashCode() : 0);
+            return result;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8a7633e..2995882 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -163,6 +163,9 @@ public class RequestResponseTest {
         checkRequest(createListOffsetRequest(0));
         checkErrorResponse(createListOffsetRequest(0), new UnknownServerException());
         checkResponse(createListOffsetResponse(0), 0);
+        checkRequest(createLeaderEpochRequest());
+        checkResponse(createLeaderEpochResponse(), 0);
+        checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException());
     }
 
     @Test
@@ -798,6 +801,26 @@ public class RequestResponseTest {
         return new InitPidResponse(Errors.NONE, 3332, (short) 3);
     }
 
+    private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+        Map<TopicPartition, Integer> epochs = new HashMap<>();
+
+        epochs.put(new TopicPartition("topic1", 0), 1);
+        epochs.put(new TopicPartition("topic1", 1), 1);
+        epochs.put(new TopicPartition("topic2", 2), 3);
+
+        return new OffsetsForLeaderEpochRequest.Builder(epochs).build();
+    }
+
+    private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
+        Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>();
+
+        epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 0));
+        epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1));
+        epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 2));
+
+        return new OffsetsForLeaderEpochResponse(epochs);
+    }
+
     private static class ByteBufferChannel implements GatheringByteChannel {
         private final ByteBuffer buf;
         private boolean closed = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
index 9704206..99551f7 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -25,14 +25,21 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
-    public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
-    public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
-    public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
-    public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
-    public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_DESERIALIZE = new AtomicReference<>(NO_CLUSTER_ID);
+    public static AtomicInteger initCount = new AtomicInteger(0);
+    public static AtomicInteger closeCount = new AtomicInteger(0);
+    public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
+    public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
+    public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
+
+    public static void resetStaticVariables() {
+        initCount = new AtomicInteger(0);
+        closeCount = new AtomicInteger(0);
+        clusterMeta = new AtomicReference<>();
+        clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
+    }
 
     public MockDeserializer() {
-        INIT_COUNT.incrementAndGet();
+        initCount.incrementAndGet();
     }
 
     @Override
@@ -43,17 +50,17 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
     public byte[] deserialize(String topic, byte[] data) {
         // This will ensure that we get the cluster metadata when deserialize is called for the first time
         // as subsequent compareAndSet operations will fail.
-        CLUSTER_ID_BEFORE_DESERIALIZE.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());
+        clusterIdBeforeDeserialize.compareAndSet(noClusterId, clusterMeta.get());
         return data;
     }
 
     @Override
     public void close() {
-        CLOSE_COUNT.incrementAndGet();
+        closeCount.incrementAndGet();
     }
 
     @Override
     public void onUpdate(ClusterResource clusterResource) {
-        CLUSTER_META.set(clusterResource);
+        clusterMeta.set(clusterResource);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index bf816e7..bdee182 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=INFO, stdout 
+log4j.rootLogger=INFO, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 6101d2a..051c5d6 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -67,7 +67,10 @@ object ApiVersion {
     "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
     // introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
     "0.11.0-IV1" -> KAFKA_0_11_0_IV1,
-    "0.11.0" -> KAFKA_0_11_0_IV1
+    "0.11.0" -> KAFKA_0_11_0_IV1,
+    // Introduce leader epoch fetches to the replica fetcher via KIP-101
+    "0.11.0-IV2" -> KAFKA_0_11_0_IV2,
+    "0.11.0" -> KAFKA_0_11_0_IV2
   )
 
   private val versionPattern = "\\.".r
@@ -159,7 +162,13 @@ case object KAFKA_0_11_0_IV0 extends ApiVersion {
 }
 
 case object KAFKA_0_11_0_IV1 extends ApiVersion {
-  val version: String = "0.11.0-IV0"
+  val version: String = "0.11.0-IV1"
   val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
-  val id: Int = 10
+  val id: Int = 11
+}
+
+case object KAFKA_0_11_0_IV2 extends ApiVersion {
+  val version: String = "0.11.0-IV2"
+  val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+  val id: Int = 12
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ddb2411..4dd96c3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -163,12 +163,21 @@ class Partition(val topic: String,
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = partitionStateInfo.controllerEpoch
       // add replicas that are new
-      allReplicas.foreach(replica => getOrCreateReplica(replica))
       val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
       // remove assigned replicas that have been removed by the controller
       (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
       inSyncReplicas = newInSyncReplicas
+
+      info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
+
+      //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
       leaderEpoch = partitionStateInfo.leaderEpoch
+      allReplicas.map(id => getOrCreateReplica(id))
+        .filter(_.isLocal)
+        .foreach { replica =>
+          replica.epochs.get.cacheLatestEpoch(leaderEpoch)
+        }
+
       zkVersion = partitionStateInfo.zkVersion
       val isNewLeader =
         if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 3995f9e..a604b87 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -22,8 +22,8 @@ import kafka.utils.Logging
 import kafka.server.{LogOffsetMetadata, LogReadResult}
 import kafka.common.KafkaException
 import org.apache.kafka.common.errors.OffsetOutOfRangeException
-
-
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
 import org.apache.kafka.common.utils.Time
 
 class Replica(val brokerId: Int,
@@ -58,6 +58,8 @@ class Replica(val brokerId: Int,
 
   def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
 
+  val epochs = log.map(_.leaderEpochCache)
+
   /*
    * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
    * set `lastCaughtUpTimeMs` to the time when the current fetch request was received.

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c4b7ce6..8d712f4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -28,6 +28,7 @@ import ConsumerFetcherThread._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.EpochEndOffset
 
 class ConsumerFetcherThread(name: String,
                             val config: ConsumerConfig,
@@ -38,7 +39,8 @@ class ConsumerFetcherThread(name: String,
                                       clientId = config.clientId,
                                       sourceBroker = sourceBroker,
                                       fetchBackOffMs = config.refreshLeaderBackoffMs,
-                                      isInterruptible = true) {
+                                      isInterruptible = true,
+                                      includeLogTruncation = false) {
 
   type REQ = FetchRequest
   type PD = PartitionData
@@ -100,7 +102,7 @@ class ConsumerFetcherThread(name: String,
 
   protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
     partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
-      if (partitionFetchState.isActive)
+      if (partitionFetchState.isReadyForFetch)
         fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
     }
 
@@ -111,6 +113,12 @@ class ConsumerFetcherThread(name: String,
     simpleConsumer.fetch(fetchRequest.underlying).data.map { case (TopicAndPartition(t, p), value) =>
       new TopicPartition(t, p) -> new PartitionData(value)
     }
+
+  override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+
+  override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
+
+  override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
 }
 
 object ConsumerFetcherThread {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a052a9e..999c6aa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -37,6 +37,12 @@ import org.apache.kafka.common.utils.{Time, Utils}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Seq, mutable}
+import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.{Time, Utils}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import org.apache.kafka.common.TopicPartition
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
@@ -136,15 +142,24 @@ class Log(@volatile var dir: File,
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
 
+  val leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
+
   locally {
     val startMs = time.milliseconds
 
     loadSegments()
+
     /* Calculate the offset of the next message */
     nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
       activeSegment.size.toInt)
 
+    leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset)
+
     logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+    // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
+    leaderEpochCache.clearEarliest(logStartOffset)
+
     buildAndRecoverPidMap(logEndOffset)
 
     info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
@@ -193,10 +208,15 @@ class Log(@volatile var dir: File,
   /** The name of this log */
   def name  = dir.getName()
 
-  /* Load the log segments from the log files on disk */
-  private def loadSegments() {
+  private def initializeLeaderEpochCache(): LeaderEpochCache = {
     // create the log directory if it doesn't exist
     dir.mkdirs()
+    new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
+      new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
+  }
+
+  /* Load the log segments from the log files on disk */
+  private def loadSegments() {
     var swapFiles = Set[File]()
 
     // first do a pass through the files in the log directory and remove any temporary files
@@ -341,7 +361,7 @@ class Log(@volatile var dir: File,
       info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
       val truncatedBytes =
         try {
-          curr.recover(config.maxMessageSize)
+          curr.recover(config.maxMessageSize, Some(leaderEpochCache))
         } catch {
           case _: InvalidOffsetException =>
             val startOffset = curr.baseOffset
@@ -352,7 +372,7 @@ class Log(@volatile var dir: File,
       if(truncatedBytes > 0) {
         // we had an invalid message, delete all remaining log
         warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
-        unflushed.foreach(deleteSegment)
+        unflushed.foreach(deleteSegment(_))
       }
     }
   }
@@ -427,11 +447,11 @@ class Log(@volatile var dir: File,
    *
    * @param records The log records to append
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+   * @param leaderEpochCache Optional cache of Leader Epoch Offsets.
    * @throws KafkaStorageException If the append fails due to an I/O error.
    * @return Information about the appended messages including the first and last offset.
    */
-  def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
-
+  def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpochCache: LeaderEpochCache = leaderEpochCache): LogAppendInfo = {
     val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
 
     // return if we have no valid messages or if this is a duplicate of the last appended entry
@@ -451,6 +471,7 @@ class Log(@volatile var dir: File,
           appendInfo.firstOffset = offset.value
           val now = time.milliseconds
           val validateAndOffsetAssignResult = try {
+            leaderEpochCache.maybeAssignLatestCachedEpochToLeo()
             LogValidator.validateMessagesAndAssignOffsets(validRecords,
                                                           offset,
                                                           now,
@@ -459,7 +480,8 @@ class Log(@volatile var dir: File,
                                                           config.compact,
                                                           config.messageFormatVersion.messageFormatVersion,
                                                           config.messageTimestampType,
-                                                          config.messageTimestampDifferenceMaxMs)
+                                                          config.messageTimestampDifferenceMaxMs,
+                                                          leaderEpochCache.latestUsedEpoch())
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
@@ -485,6 +507,12 @@ class Log(@volatile var dir: File,
             }
           }
         } else {
+          //Update the epoch cache with the epoch stamped by the leader
+          records.batches().asScala.map { batch =>
+            if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+              leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+          }
+
           // we are taking the offsets we are given
           if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
             throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
@@ -808,9 +836,12 @@ class Log(@volatile var dir: File,
       // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
       if (segments.size == numToDelete)
         roll()
-      // remove the segments for lookups
-      deletable.foreach(deleteSegment)
-      logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+      lock synchronized {
+        // remove the segments for lookups
+        deletable.foreach(deleteSegment(_))
+        logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+        leaderEpochCache.clearEarliest(logStartOffset)
+      }
     }
     numToDelete
   }
@@ -1017,6 +1048,7 @@ class Log(@volatile var dir: File,
     lock synchronized {
       logSegments.foreach(_.delete())
       segments.clear()
+      leaderEpochCache.clear()
       Utils.delete(dir)
     }
   }
@@ -1027,23 +1059,24 @@ class Log(@volatile var dir: File,
    * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
    */
   private[log] def truncateTo(targetOffset: Long) {
-    info("Truncating log %s to offset %d.".format(name, targetOffset))
     if(targetOffset < 0)
       throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
-    if(targetOffset > logEndOffset) {
+    if(targetOffset >= logEndOffset) {
       info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
       return
     }
+    info("Truncating log %s to offset %d.".format(name, targetOffset))
     lock synchronized {
       if(segments.firstEntry.getValue.baseOffset > targetOffset) {
         truncateFullyAndStartAt(targetOffset)
       } else {
         val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
-        deletable.foreach(deleteSegment)
+        deletable.foreach(deleteSegment(_))
         activeSegment.truncateTo(targetOffset)
         updateLogEndOffset(targetOffset)
         this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
         this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+        leaderEpochCache.clearLatest(targetOffset)
       }
       buildAndRecoverPidMap(targetOffset)
     }
@@ -1058,7 +1091,7 @@ class Log(@volatile var dir: File,
     debug("Truncate and start log '" + name + "' to " + newOffset)
     lock synchronized {
       val segmentsToDelete = logSegments.toList
-      segmentsToDelete.foreach(deleteSegment)
+      segmentsToDelete.foreach(deleteSegment(_))
       addSegment(new LogSegment(dir,
                                 newOffset,
                                 indexIntervalBytes = config.indexInterval,
@@ -1069,6 +1102,7 @@ class Log(@volatile var dir: File,
                                 initFileSize = initFileSize,
                                 preallocate = config.preallocate))
       updateLogEndOffset(newOffset)
+      leaderEpochCache.clear()
       this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
       this.logStartOffset = newOffset
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 01c4df4..2b4d956 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
 import com.yammer.metrics.core.Gauge
 import kafka.common.LogCleaningAbortedException
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.OffsetCheckpoint
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
 import org.apache.kafka.common.TopicPartition
@@ -55,7 +55,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
   private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
   
   /* the offset checkpoints holding the last cleaned point for each log */
-  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
+  private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile)))).toMap
 
   /* the set of logs currently being cleaned */
   private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ec164e2..469c46b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,20 +18,19 @@
 package kafka.log
 
 import java.io._
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-import kafka.utils._
-
-import scala.collection._
-import scala.collection.JavaConverters._
-import kafka.common.{KafkaException, KafkaStorageException}
-import kafka.server._
-import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+import java.util.concurrent._
 
 import kafka.admin.AdminUtils
+import kafka.common.{KafkaException, KafkaStorageException}
+import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
+import kafka.utils._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Time
 
+import scala.collection.JavaConverters._
+import scala.collection._
+
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
  * All read and write operations are delegated to the individual log instances.
@@ -67,8 +66,8 @@ class LogManager(val logDirs: Array[File],
 
   createAndValidateLogDirs(logDirs)
   private val dirLocks = lockLogDirs(logDirs)
-  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
-  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
+  private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap
+  private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap
   loadLogs()
 
   // public, so we can access this from kafka.admin.DeleteTopicTest

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4e77625..b77be34 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
 
 import kafka.common._
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.server.epoch.LeaderEpochCache
 import kafka.server.{FetchDataInfo, LogOffsetMetadata}
 import kafka.utils._
 import org.apache.kafka.common.errors.CorruptRecordException
@@ -213,10 +214,11 @@ class LogSegment(val log: FileRecords,
    *
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
+    * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
-  def recover(maxMessageSize: Int): Int = {
+  def recover(maxMessageSize: Int, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
     index.truncate()
     index.resize(index.maxIndexSize)
     timeIndex.truncate()
@@ -242,6 +244,13 @@ class LogSegment(val log: FileRecords,
           lastIndexEntry = validBytes
         }
         validBytes += batch.sizeInBytes()
+
+        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+          leaderEpochCache.foreach { cache =>
+            if (batch.partitionLeaderEpoch > cache.latestUsedEpoch()) // this is to avoid unnecessary warning in cache.assign()
+              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+          }
+        }
       }
     } catch {
       case e: CorruptRecordException =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index c01a5de..fa520ad 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -51,20 +51,21 @@ private[kafka] object LogValidator extends Logging {
                                                       compactedTopic: Boolean = false,
                                                       messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
                                                       messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+                                                      messageTimestampDiffMaxMs: Long,
+                                                      partitionLeaderEpoch: Int = RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
       if (!records.hasMatchingMagic(messageFormatVersion))
         convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
-          messageTimestampDiffMaxMs, messageFormatVersion)
+          messageTimestampDiffMaxMs, messageFormatVersion, partitionLeaderEpoch)
       else
         // Do in-place validation, offset assignment and maybe set timestamp
         assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs)
+          messageTimestampDiffMaxMs, partitionLeaderEpoch)
     } else {
 
       validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
-        messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
+        messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs, partitionLeaderEpoch)
     }
   }
 
@@ -74,7 +75,8 @@ private[kafka] object LogValidator extends Logging {
                                                    now: Long,
                                                    timestampType: TimestampType,
                                                    messageTimestampDiffMaxMs: Long,
-                                                   toMagicValue: Byte): ValidationAndOffsetAssignResult = {
+                                                   toMagicValue: Byte,
+                                                   partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
     val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
       CompressionType.NONE, records.records)
 
@@ -85,7 +87,7 @@ private[kafka] object LogValidator extends Logging {
 
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
-      offsetCounter.value, now, pid, epoch, sequence)
+      offsetCounter.value, now, pid, epoch, sequence, partitionLeaderEpoch)
 
     for (batch <- records.batches.asScala) {
       ensureNonTransactional(batch)
@@ -112,7 +114,8 @@ private[kafka] object LogValidator extends Logging {
                                          currentTimestamp: Long,
                                          compactedTopic: Boolean,
                                          timestampType: TimestampType,
-                                         timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+                                         timestampDiffMaxMs: Long,
+                                         partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
     var maxTimestamp = RecordBatch.NO_TIMESTAMP
     var offsetOfMaxTimestamp = -1L
     val initialOffset = offsetCounter.value
@@ -138,6 +141,9 @@ private[kafka] object LogValidator extends Logging {
 
       batch.setLastOffset(offsetCounter.value - 1)
 
+      if(batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+        batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
       // TODO: in the compressed path, we ensure that the batch max timestamp is correct.
       //       We should either do the same or (better) let those two paths converge.
       if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.LOG_APPEND_TIME)
@@ -171,7 +177,8 @@ private[kafka] object LogValidator extends Logging {
                                                  compactedTopic: Boolean = false,
                                                  messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
                                                  messageTimestampType: TimestampType,
-                                                 messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+                                                 messageTimestampDiffMaxMs: Long,
+                                                 partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
 
       // No in place assignment situation 1 and 2
       var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0
@@ -223,7 +230,7 @@ private[kafka] object LogValidator extends Logging {
           (first.producerId, first.producerEpoch, first.baseSequence)
         }
         buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
-          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence)
+          CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
       } else {
         // we can update the batch only and write the compressed payload as is
         val batch = records.batches.iterator.next()
@@ -237,6 +244,9 @@ private[kafka] object LogValidator extends Logging {
         if (messageFormatVersion >= RecordBatch.MAGIC_VALUE_V1)
           batch.setMaxTimestamp(messageTimestampType, maxTimestamp)
 
+        if(messageFormatVersion >= RecordBatch.MAGIC_VALUE_V2)
+          batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
         ValidationAndOffsetAssignResult(validatedRecords = records,
           maxTimestamp = maxTimestamp,
           shallowOffsetOfMaxTimestamp = lastOffset,
@@ -247,11 +257,11 @@ private[kafka] object LogValidator extends Logging {
   private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
                                            compressionType: CompressionType, logAppendTime: Long,
                                            validatedRecords: Seq[Record],
-                                           producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = {
+                                           producerId: Long, epoch: Short, baseSequence: Int, partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
     val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
     val buffer = ByteBuffer.allocate(estimatedSize)
     val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
-      logAppendTime, producerId, epoch, baseSequence)
+      logAppendTime, producerId, epoch, baseSequence, partitionLeaderEpoch)
 
     validatedRecords.foreach { record =>
       builder.appendWithOffset(offsetCounter.getAndIncrement(), record)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 14e56bd..734c006 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,6 +38,7 @@ import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
 import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.EpochEndOffset
 
 /**
  *  Abstract class for fetching data from multiple partitions from the same broker.
@@ -46,13 +47,15 @@ abstract class AbstractFetcherThread(name: String,
                                      clientId: String,
                                      val sourceBroker: BrokerEndPoint,
                                      fetchBackOffMs: Int = 0,
-                                     isInterruptible: Boolean = true)
+                                     isInterruptible: Boolean = true,
+                                     includeLogTruncation: Boolean
+                                    )
   extends ShutdownableThread(name, isInterruptible) {
 
   type REQ <: FetchRequest
   type PD <: PartitionData
 
-  private val partitionStates = new PartitionStates[PartitionFetchState]
+  private[server] val partitionStates = new PartitionStates[PartitionFetchState]
   private val partitionMapLock = new ReentrantLock
   private val partitionMapCond = partitionMapLock.newCondition()
 
@@ -71,6 +74,12 @@ abstract class AbstractFetcherThread(name: String,
   // deal with partitions with errors, potentially due to leadership changes
   protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
 
+  protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int]
+
+  protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
+
+  protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long]
+
   protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
 
   protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
@@ -87,12 +96,12 @@ abstract class AbstractFetcherThread(name: String,
     fetcherLagStats.unregister()
   }
 
-  override def doWork() {
+  private def states() = partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }
 
+  override def doWork() {
+    maybeTruncate()
     val fetchRequest = inLock(partitionMapLock) {
-      val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>
-        state.topicPartition -> state.value
-      })
+      val fetchRequest = buildFetchRequest(states)
       if (fetchRequest.isEmpty) {
         trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
         partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
@@ -103,6 +112,30 @@ abstract class AbstractFetcherThread(name: String,
       processFetchRequest(fetchRequest)
   }
 
+  /**
+    * - Build a leader epoch fetch based on partitions that are in the Truncating phase
+    * - Issue LeaderEpochRequeust, retrieving the latest offset for each partition's
+    *   leader epoch. This is the offset the follower should truncate to ensure
+    *   accurate log replication.
+    * - Finally truncate the logs for partitions in the truncating phase and mark them
+    *   truncation complete. Do this within a lock to ensure no leadership changes can
+    *   occur during truncation.
+    */
+  def maybeTruncate(): Unit = {
+    val epochRequests = inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
+
+    if (!epochRequests.isEmpty) {
+      val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
+      //Ensure we hold a lock during truncation.
+      inLock(partitionMapLock) {
+        //Check no leadership changes happened whilst we were unlocked, fetching epochs
+        val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
+        val truncationPoints = maybeTruncate(leaderEpochs)
+        markTruncationComplete(truncationPoints)
+      }
+    }
+  }
+
   private def processFetchRequest(fetchRequest: REQ) {
     val partitionsWithError = mutable.Set[TopicPartition]()
 
@@ -208,25 +241,41 @@ abstract class AbstractFetcherThread(name: String,
         !partitionStates.contains(tp)
       }.map { case (tp, offset) =>
         val fetchState =
-          if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(tp))
-          else new PartitionFetchState(offset)
+          if (PartitionTopicInfo.isOffsetInvalid(offset))
+            new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
+          else
+            new PartitionFetchState(offset, includeLogTruncation)
         tp -> fetchState
       }
-      val existingPartitionToState = partitionStates.partitionStates.asScala.map { state =>
-        state.topicPartition -> state.value
-      }.toMap
+      val existingPartitionToState = states().toMap
       partitionStates.set((existingPartitionToState ++ newPartitionToState).asJava)
       partitionMapCond.signalAll()
     } finally partitionMapLock.unlock()
   }
 
+  /**
+    * Loop through all partitions, marking them as truncation complete and applying the correct offset
+    * @param partitions the partitions to mark truncation complete
+    */
+  private def markTruncationComplete(partitions: Map[TopicPartition, Long]) {
+    val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
+      .map { state =>
+        val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
+          case Some(offset) => new PartitionFetchState(offset, state.value.delay, truncatingLog = false)
+          case None => state.value()
+        }
+        (state.topicPartition(), maybeTruncationComplete)
+      }.toMap
+    partitionStates.set(newStates.asJava)
+  }
+
   def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
     partitionMapLock.lockInterruptibly()
     try {
       for (partition <- partitions) {
         Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
-          if (currentPartitionFetchState.isActive)
-            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
+          if (!currentPartitionFetchState.isDelayed)
+            partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
         )
       }
       partitionMapCond.signalAll()
@@ -348,13 +397,25 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
 }
 
 /**
-  * case class to keep partition offset and its state(active, inactive)
+  * case class to keep partition offset and its state(truncatingLog, delayed)
+  * This represents a partition as being either:
+  * (1) Truncating its log, for example having recently become a follower
+  * (2) Delayed, for example due to an error, where we subsequently back off a bit
+  * (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
   */
-case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncatingLog: Boolean = false) {
+
+  def this(offset: Long, truncatingLog: Boolean) = this(offset, new DelayedItem(0), truncatingLog)
+
+  def this(offset: Long, delay: DelayedItem) = this(offset, new DelayedItem(0), false)
 
   def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
 
-  def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
+  def isReadyForFetch: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0 && !truncatingLog
+
+  def isTruncatingLog: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0 && truncatingLog
+
+  def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
 
-  override def toString = "%d-%b".format(fetchOffset, isActive)
+  override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 600b84d..1e8900b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -103,6 +103,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
         case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
         case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
+        case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
         case requestId => throw new KafkaException("Unknown api code " + requestId)
       }
     } catch {
@@ -1322,6 +1323,17 @@ class KafkaApis(val requestChannel: RequestChannel,
     txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
   }
 
+  def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
+    val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
+    val requestInfo = offsetForEpoch.epochsByTopicPartition()
+    authorizeClusterAction(request)
+
+    val responseBody = new OffsetsForLeaderEpochResponse(
+      replicaManager.getResponseFor(requestInfo)
+    )
+    requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
deleted file mode 100644
index de2626c..0000000
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.nio.file.{FileSystems, Paths}
-import java.util.regex.Pattern
-
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection._
-import kafka.utils.{Exit, Logging}
-import kafka.common._
-import java.io._
-import java.nio.charset.StandardCharsets
-
-import org.apache.kafka.common.TopicPartition
-
-object OffsetCheckpoint {
-  private val WhiteSpacesPattern = Pattern.compile("\\s+")
-  private val CurrentVersion = 0
-}
-
-/**
- * This class saves out a map of topic/partition=>offsets to a file
- */
-class OffsetCheckpoint(val file: File) extends Logging {
-  import OffsetCheckpoint._
-  private val path = file.toPath.toAbsolutePath
-  private val tempPath = Paths.get(path.toString + ".tmp")
-  private val lock = new Object()
-  file.createNewFile() // in case the file doesn't exist
-
-  def write(offsets: Map[TopicPartition, Long]) {
-    lock synchronized {
-      // write to temp file and then swap with the existing file
-      val fileOutputStream = new FileOutputStream(tempPath.toFile)
-      val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
-      try {
-        writer.write(CurrentVersion.toString)
-        writer.newLine()
-
-        writer.write(offsets.size.toString)
-        writer.newLine()
-
-        offsets.foreach { case (topicPart, offset) =>
-          writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
-          writer.newLine()
-        }
-
-        writer.flush()
-        fileOutputStream.getFD().sync()
-      } catch {
-        case e: FileNotFoundException =>
-          if (FileSystems.getDefault.isReadOnly) {
-            fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
-            Exit.halt(1)
-          }
-          throw e
-      } finally {
-        writer.close()
-      }
-
-      Utils.atomicMoveWithFallback(tempPath, path)
-    }
-  }
-
-  def read(): Map[TopicPartition, Long] = {
-
-    def malformedLineException(line: String) =
-      new IOException(s"Malformed line in offset checkpoint file: $line'")
-
-    lock synchronized {
-      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
-      var line: String = null
-      try {
-        line = reader.readLine()
-        if (line == null)
-          return Map.empty
-        val version = line.toInt
-        version match {
-          case CurrentVersion =>
-            line = reader.readLine()
-            if (line == null)
-              return Map.empty
-            val expectedSize = line.toInt
-            val offsets = mutable.Map[TopicPartition, Long]()
-            line = reader.readLine()
-            while (line != null) {
-              WhiteSpacesPattern.split(line) match {
-                case Array(topic, partition, offset) =>
-                  offsets += new TopicPartition(topic, partition.toInt) -> offset.toLong
-                  line = reader.readLine()
-                case _ => throw malformedLineException(line)
-              }
-            }
-            if (offsets.size != expectedSize)
-              throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
-            offsets
-          case _ =>
-            throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
-        }
-      } catch {
-        case _: NumberFormatException => throw malformedLineException(line)
-      } finally {
-        reader.close()
-      }
-    }
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
new file mode 100644
index 0000000..8ba3f60
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -0,0 +1,105 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+package kafka.server
+
+import java.net.SocketTimeoutException
+
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.clients._
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.AbstractRequest.Builder
+
+import scala.collection.JavaConverters._
+
+trait BlockingSend {
+
+  def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse
+
+  def close()
+}
+
+class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
+                                 brokerConfig: KafkaConfig,
+                                 metrics: Metrics,
+                                 time: Time,
+                                 fetcherId: Int,
+                                 clientId: String) extends BlockingSend {
+
+  private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
+  private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
+
+  private val networkClient = {
+    val channelBuilder = ChannelBuilders.clientChannelBuilder(
+      brokerConfig.interBrokerSecurityProtocol,
+      JaasContext.Type.SERVER,
+      brokerConfig,
+      brokerConfig.interBrokerListenerName,
+      brokerConfig.saslMechanismInterBrokerProtocol,
+      brokerConfig.saslInterBrokerHandshakeRequestEnable
+    )
+    val selector = new Selector(
+      NetworkReceive.UNLIMITED,
+      brokerConfig.connectionsMaxIdleMs,
+      metrics,
+      time,
+      "replica-fetcher",
+      Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
+      false,
+      channelBuilder
+    )
+    new NetworkClient(
+      selector,
+      new ManualMetadataUpdater(),
+      clientId,
+      1,
+      0,
+      Selectable.USE_DEFAULT_BUFFER_SIZE,
+      brokerConfig.replicaSocketReceiveBufferBytes,
+      brokerConfig.requestTimeoutMs,
+      time,
+      false,
+      new ApiVersions
+    )
+  }
+
+  override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse =  {
+    try {
+      if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+        throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
+      else {
+        val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
+          time.milliseconds(), true)
+        NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
+      }
+    }
+    catch {
+      case e: Throwable =>
+        networkClient.close(sourceBroker.id.toString)
+        throw e
+    }
+  }
+
+  def close(): Unit = {
+    networkClient.close()
+  }
+}
\ No newline at end of file