You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/04/06 21:51:31 UTC
[3/3] kafka git commit: KIP-101: Alter Replication Protocol to use
Leader Epoch rather than High Watermark for Truncation
KIP-101: Alter Replication Protocol to use Leader Epoch rather than High Watermark for Truncation
This PR replaces https://github.com/apache/kafka/pull/2743 (just raising from Confluent repo)
This PR describes the addition of Partition Level Leader Epochs to messages in Kafka as a mechanism for fixing some known issues in the replication protocol. Full details can be found here:
[KIP-101 Reference](https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation)
*The key elements are*:
- Epochs are stamped on messages as they enter the leader.
- Epochs are tracked in both leader and follower in a new checkpoint file.
- A new API allows followers to retrieve the leader's latest offset for a particular epoch.
- The logic for truncating the log, when a replica becomes a follower, has been moved from Partition into the ReplicaFetcherThread
- When partitions are added to the ReplicaFetcherThread they are added in an initialising state. Initialising partitions request leader epochs and then truncate their logs appropriately.
This test provides a good overview of the workflow `EpochDrivenReplicationProtocolAcceptanceTest.shouldFollowLeaderEpochBasicWorkflow()`
The corrupted log use case is covered by the test
`EpochDrivenReplicationProtocolAcceptanceTest.offsetsShouldNotGoBackwards()`
Remaining work: There is a do list here: https://docs.google.com/document/d/1edmMo70MfHEZH9x38OQfTWsHr7UGTvg-NOxeFhOeRew/edit?usp=sharing
Author: Ben Stopford <be...@gmail.com>
Author: Jun Rao <ju...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #2808 from benstopford/kip-101-v2
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0baea2ac
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0baea2ac
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0baea2ac
Branch: refs/heads/trunk
Commit: 0baea2ac13532981f3fea11e5dfc6da5aafaeaa8
Parents: b611cfa
Author: Ben Stopford <be...@gmail.com>
Authored: Thu Apr 6 14:51:09 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Thu Apr 6 14:51:09 2017 -0700
----------------------------------------------------------------------
checkstyle/suppressions.xml | 2 +
.../apache/kafka/common/protocol/ApiKeys.java | 3 +-
.../apache/kafka/common/protocol/Protocol.java | 52 ++
.../kafka/common/record/MemoryRecords.java | 29 +-
.../kafka/common/requests/AbstractRequest.java | 3 +
.../kafka/common/requests/AbstractResponse.java | 2 +
.../kafka/common/requests/EpochEndOffset.java | 81 +++
.../requests/OffsetsForLeaderEpochRequest.java | 175 +++++
.../requests/OffsetsForLeaderEpochResponse.java | 132 ++++
.../common/requests/RequestResponseTest.java | 23 +
.../org/apache/kafka/test/MockDeserializer.java | 25 +-
config/log4j.properties | 2 +-
core/src/main/scala/kafka/api/ApiVersion.scala | 15 +-
.../main/scala/kafka/cluster/Partition.scala | 11 +-
core/src/main/scala/kafka/cluster/Replica.scala | 6 +-
.../kafka/consumer/ConsumerFetcherThread.scala | 12 +-
core/src/main/scala/kafka/log/Log.scala | 62 +-
.../scala/kafka/log/LogCleanerManager.scala | 4 +-
core/src/main/scala/kafka/log/LogManager.scala | 21 +-
core/src/main/scala/kafka/log/LogSegment.scala | 11 +-
.../src/main/scala/kafka/log/LogValidator.scala | 32 +-
.../kafka/server/AbstractFetcherThread.scala | 95 ++-
.../src/main/scala/kafka/server/KafkaApis.scala | 12 +
.../scala/kafka/server/OffsetCheckpoint.scala | 124 ----
.../server/ReplicaFetcherBlockingSend.scala | 105 +++
.../kafka/server/ReplicaFetcherThread.scala | 198 ++---
.../scala/kafka/server/ReplicaManager.scala | 41 +-
.../server/checkpoints/CheckpointFile.scala | 114 +++
.../checkpoints/LeaderEpochCheckpointFile.scala | 67 ++
.../checkpoints/OffsetCheckpointFile.scala | 60 ++
.../server/epoch/LeaderEpochFileCache.scala | 224 ++++++
.../kafka/api/AuthorizerIntegrationTest.scala | 16 +-
.../kafka/api/EndToEndClusterIdTest.scala | 11 +-
.../kafka/api/ProducerCompressionTest.scala | 5 +-
.../kafka/log/LogCleanerIntegrationTest.scala | 7 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 12 +-
.../scala/unit/kafka/log/LogSegmentTest.scala | 25 +
.../src/test/scala/unit/kafka/log/LogTest.scala | 225 +++++-
.../server/AbstractFetcherThreadTest.scala | 15 +-
.../unit/kafka/server/ISRExpirationTest.scala | 5 +
.../unit/kafka/server/LogRecoveryTest.scala | 5 +-
.../kafka/server/ReplicaFetcherThreadTest.scala | 402 +++++++++++
.../kafka/server/ReplicaManagerQuotasTest.scala | 2 +-
.../kafka/server/ReplicationQuotasTest.scala | 2 +-
.../unit/kafka/server/SimpleFetchTest.scala | 3 +-
.../LeaderEpochCheckpointFileTest.scala | 72 ++
.../checkpoints/OffsetCheckpointFileTest.scala | 89 +++
...rivenReplicationProtocolAcceptanceTest.scala | 410 +++++++++++
.../server/epoch/LeaderEpochFileCacheTest.scala | 721 +++++++++++++++++++
.../epoch/LeaderEpochIntegrationTest.scala | 283 ++++++++
.../epoch/OffsetsForLeaderEpochTest.scala | 98 +++
.../util/ReplicaFetcherMockBlockingSend.scala | 80 ++
.../test/scala/unit/kafka/utils/TestUtils.scala | 3 +-
.../core/zookeeper_security_upgrade_test.py | 1 -
54 files changed, 3908 insertions(+), 327 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/checkstyle/suppressions.xml
----------------------------------------------------------------------
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 7bc55c8..39d61e2 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -232,5 +232,7 @@
<suppress checks="NPathComplexity"
files="KafkaLog4jAppender.java"/>
+ <suppress checks="JavaNCSS"
+ files="RequestResponseTest.java"/>
</suppressions>
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 4183638..b65defb 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -47,7 +47,8 @@ public enum ApiKeys {
CREATE_TOPICS(19, "CreateTopics"),
DELETE_TOPICS(20, "DeleteTopics"),
DELETE_RECORDS(21, "DeleteRecords"),
- INIT_PRODUCER_ID(22, "InitProducerId");
+ INIT_PRODUCER_ID(22, "InitProducerId"),
+ OFFSET_FOR_LEADER_EPOCH(23, "OffsetForLeaderEpoch");
private static final ApiKeys[] ID_TO_TYPE;
private static final int MIN_API_KEY = 0;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 37eb75c..cc228c5 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -1199,6 +1199,56 @@ public class Protocol {
public static final Schema[] INIT_PRODUCER_ID_RESPONSE = new Schema[] {INIT_PRODUCER_ID_RESPONSE_V0};
+ /* Offsets for Leader Epoch api */
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0 = new Schema(
+ new Field("partition_id",
+ INT32,
+ "The partition_id"),
+ new Field("leader_epoch",
+ INT32,
+ "The epoch")
+ );
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0 = new Schema(
+ new Field("topic",
+ STRING,
+ "The topic"),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_PARTITION_V0),
+ "The partition")
+ );
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V0 = new Schema(
+ new Field("topics",
+ new ArrayOf(OFFSET_FOR_LEADER_EPOCH_REQUEST_TOPIC_V0),
+ "An array of topics to get epochs for"));
+
+
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0 = new Schema(
+ new Field("error_code",
+ INT16,
+ "The error code"),
+ new Field("partition_id",
+ INT32,
+ "The partition id"),
+ new Field("end_offset",
+ INT64,
+ "The end offset")
+ );
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0 = new Schema(
+ new Field("topic",
+ STRING,
+ "The topic"),
+ new Field("partitions",
+ new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_PARTITION_V0),
+ "The partition")
+ );
+ public static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema(
+ new Field("topics",
+ new ArrayOf(OFFSET_FOR_LEADER_EPOCH_RESPONSE_TOPIC_V0),
+ "An array of topics for which we have leader offsets for some requested Partition Leader Epoch"));
+
+ public static final Schema[] OFFSET_FOR_LEADER_EPOCH_REQUEST = new Schema[] {OFFSET_FOR_LEADER_EPOCH_REQUEST_V0};
+ public static final Schema[] OFFSET_FOR_LEADER_EPOCH_RESPONSE = new Schema[] {OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0};
+
/* an array of all requests and responses with all schema versions; a null value in the inner array means that the
* particular version is not supported */
public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
@@ -1232,6 +1282,7 @@ public class Protocol {
REQUESTS[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_REQUEST;
REQUESTS[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_REQUEST;
REQUESTS[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_REQUEST;
+ REQUESTS[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_REQUEST;
RESPONSES[ApiKeys.PRODUCE.id] = PRODUCE_RESPONSE;
RESPONSES[ApiKeys.FETCH.id] = FETCH_RESPONSE;
@@ -1256,6 +1307,7 @@ public class Protocol {
RESPONSES[ApiKeys.DELETE_TOPICS.id] = DELETE_TOPICS_RESPONSE;
RESPONSES[ApiKeys.DELETE_RECORDS.id] = DELETE_RECORDS_RESPONSE;
RESPONSES[ApiKeys.INIT_PRODUCER_ID.id] = INIT_PRODUCER_ID_RESPONSE;
+ RESPONSES[ApiKeys.OFFSET_FOR_LEADER_EPOCH.id] = OFFSET_FOR_LEADER_EPOCH_RESPONSE;
/* set the minimum and maximum version of each api */
for (ApiKeys api : ApiKeys.values()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f810e39..b3beed5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -303,7 +303,18 @@ public class MemoryRecords extends AbstractRecords {
long baseOffset,
long logAppendTime) {
return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
- RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE);
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ int partitionLeaderEpoch) {
+ return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
+ RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE, partitionLeaderEpoch);
}
public static MemoryRecordsBuilder builder(ByteBuffer buffer,
@@ -315,8 +326,22 @@ public class MemoryRecords extends AbstractRecords {
long pid,
short epoch,
int baseSequence) {
+ return builder(buffer, magic, compressionType, timestampType, baseOffset, logAppendTime,
+ pid, epoch, baseSequence, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH);
+ }
+
+ public static MemoryRecordsBuilder builder(ByteBuffer buffer,
+ byte magic,
+ CompressionType compressionType,
+ TimestampType timestampType,
+ long baseOffset,
+ long logAppendTime,
+ long pid,
+ short epoch,
+ int baseSequence,
+ int partitionLeaderEpoch) {
return new MemoryRecordsBuilder(buffer, magic, compressionType, timestampType, baseOffset,
- logAppendTime, pid, epoch, baseSequence, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+ logAppendTime, pid, epoch, baseSequence, false, partitionLeaderEpoch,
buffer.remaining());
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 1638556..7ce3518 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -174,6 +174,9 @@ public abstract class AbstractRequest extends AbstractRequestResponse {
case INIT_PRODUCER_ID:
request = new InitPidRequest(struct, version);
break;
+ case OFFSET_FOR_LEADER_EPOCH:
+ request = new OffsetsForLeaderEpochRequest(struct, version);
+ break;
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getRequest`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 314aa42..1ae30d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -95,6 +95,8 @@ public abstract class AbstractResponse extends AbstractRequestResponse {
return new DeleteRecordsResponse(struct);
case INIT_PRODUCER_ID:
return new InitPidResponse(struct);
+ case OFFSET_FOR_LEADER_EPOCH:
+ return new OffsetsForLeaderEpochResponse(struct);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `getResponse`, the " +
"code should be updated to do so.", apiKey));
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
new file mode 100644
index 0000000..2d49149
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/EpochEndOffset.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.Errors;
+
+import static org.apache.kafka.common.record.RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH;
+
+/**
+ * The offset, fetched from a leader, for a particular partition.
+ */
+
+public class EpochEndOffset {
+ public static final long UNDEFINED_EPOCH_OFFSET = UNKNOWN_PARTITION_LEADER_EPOCH;
+ public static final int UNDEFINED_EPOCH = -1;
+
+ private Errors error;
+ private long endOffset;
+
+ public EpochEndOffset(Errors error, long endOffset) {
+ this.error = error;
+ this.endOffset = endOffset;
+ }
+
+ public EpochEndOffset(long endOffset) {
+ this.error = Errors.NONE;
+ this.endOffset = endOffset;
+ }
+
+ public Errors error() {
+ return error;
+ }
+
+ public boolean hasError() {
+ return error != Errors.NONE;
+ }
+
+ public long endOffset() {
+ return endOffset;
+ }
+
+ @Override
+ public String toString() {
+ return "EpochEndOffset{" +
+ "error=" + error +
+ ", endOffset=" + endOffset +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ EpochEndOffset that = (EpochEndOffset) o;
+
+ if (error != that.error) return false;
+ return endOffset == that.endOffset;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) error.code();
+ result = 31 * result + (int) (endOffset ^ (endOffset >>> 32));
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
new file mode 100644
index 0000000..fed8910
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetsForLeaderEpochRequest extends AbstractRequest {
+ public static final String TOPICS = "topics";
+ public static final String TOPIC = "topic";
+ public static final String PARTITIONS = "partitions";
+ public static final String PARTITION_ID = "partition_id";
+ public static final String LEADER_EPOCH = "leader_epoch";
+
+ private Map<TopicPartition, Integer> epochsByPartition;
+
+ public Map<TopicPartition, Integer> epochsByTopicPartition() {
+ return epochsByPartition;
+ }
+
+ public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
+ private Map<TopicPartition, Integer> epochsByPartition = new HashMap();
+
+ public Builder() {
+ super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+ }
+
+ public Builder(Map<TopicPartition, Integer> epochsByPartition) {
+ super(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+ this.epochsByPartition = epochsByPartition;
+ }
+
+ public Builder add(TopicPartition topicPartition, Integer epoch) {
+ epochsByPartition.put(topicPartition, epoch);
+ return this;
+ }
+
+ @Override
+ public OffsetsForLeaderEpochRequest build(short version) {
+ return new OffsetsForLeaderEpochRequest(epochsByPartition, version);
+ }
+
+ public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short version) {
+ return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(version, buffer), version);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("(type=OffsetForLeaderEpochRequest, ").
+ append("epochsByTopic=").append(epochsByPartition).
+ append(")");
+ return bld.toString();
+ }
+ }
+
+ public OffsetsForLeaderEpochRequest(Map<TopicPartition, Integer> epochsByPartition, short version) {
+ super(version);
+ this.epochsByPartition = epochsByPartition;
+ }
+
+ public OffsetsForLeaderEpochRequest(Struct struct, short version) {
+ super(version);
+ epochsByPartition = new HashMap<>();
+ for (Object t : struct.getArray(TOPICS)) {
+ Struct topicAndEpochs = (Struct) t;
+ String topic = topicAndEpochs.getString(TOPIC);
+ for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
+ Struct partitionAndEpoch = (Struct) e;
+ int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+ int epoch = partitionAndEpoch.getInt(LEADER_EPOCH);
+ TopicPartition tp = new TopicPartition(topic, partitionId);
+ epochsByPartition.put(tp, epoch);
+ }
+ }
+ }
+
+ public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short versionId) {
+ return new OffsetsForLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.parseRequest(versionId, buffer), versionId);
+ }
+
+ @Override
+ protected Struct toStruct() {
+ Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+
+ //Group by topic
+ Map<String, List<PartitionLeaderEpoch>> topicsToPartitionEpochs = new HashMap<>();
+ for (TopicPartition tp : epochsByPartition.keySet()) {
+ List<PartitionLeaderEpoch> partitionEndOffsets = topicsToPartitionEpochs.get(tp.topic());
+ if (partitionEndOffsets == null)
+ partitionEndOffsets = new ArrayList<>();
+ partitionEndOffsets.add(new PartitionLeaderEpoch(tp.partition(), epochsByPartition.get(tp)));
+ topicsToPartitionEpochs.put(tp.topic(), partitionEndOffsets);
+ }
+
+ List<Struct> topics = new ArrayList<>();
+ for (Map.Entry<String, List<PartitionLeaderEpoch>> topicEpochs : topicsToPartitionEpochs.entrySet()) {
+ Struct partition = struct.instance(TOPICS);
+ String topic = topicEpochs.getKey();
+ partition.set(TOPIC, topic);
+ List<PartitionLeaderEpoch> partitionLeaderEpoches = topicEpochs.getValue();
+ List<Struct> partitions = new ArrayList<>(partitionLeaderEpoches.size());
+ for (PartitionLeaderEpoch partitionLeaderEpoch : partitionLeaderEpoches) {
+ Struct partitionRow = partition.instance(PARTITIONS);
+ partitionRow.set(PARTITION_ID, partitionLeaderEpoch.partitionId);
+ partitionRow.set(LEADER_EPOCH, partitionLeaderEpoch.epoch);
+ partitions.add(partitionRow);
+ }
+ partition.set(PARTITIONS, partitions.toArray());
+ topics.add(partition);
+ }
+ struct.set(TOPICS, topics.toArray());
+ return struct;
+ }
+
+ @Override
+ public AbstractResponse getErrorResponse(Throwable e) {
+ Errors error = Errors.forException(e);
+ Map<TopicPartition, EpochEndOffset> errorResponse = new HashMap();
+ for (TopicPartition tp : epochsByPartition.keySet()) {
+ errorResponse.put(tp, new EpochEndOffset(error, EpochEndOffset.UNDEFINED_EPOCH_OFFSET));
+ }
+ return new OffsetsForLeaderEpochResponse(errorResponse);
+ }
+
+ private class PartitionLeaderEpoch {
+ int partitionId;
+ int epoch;
+
+ public PartitionLeaderEpoch(int partitionId, int epoch) {
+ this.partitionId = partitionId;
+ this.epoch = epoch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PartitionLeaderEpoch other = (PartitionLeaderEpoch) o;
+
+ if (partitionId != other.partitionId) return false;
+ return epoch == other.epoch;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = partitionId;
+ result = 31 * result + epoch;
+ return result;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
new file mode 100644
index 0000000..03f3069
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class OffsetsForLeaderEpochResponse extends AbstractResponse {
+ public static final String TOPICS = "topics";
+ public static final String TOPIC = "topic";
+ public static final String PARTITIONS = "partitions";
+ public static final String ERROR_CODE = "error_code";
+ public static final String PARTITION_ID = "partition_id";
+ public static final String END_OFFSET = "end_offset";
+
+ private Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition;
+
+ public OffsetsForLeaderEpochResponse(Struct struct) {
+ epochEndOffsetsByPartition = new HashMap<>();
+ for (Object t : struct.getArray(TOPICS)) {
+ Struct topicAndEpochs = (Struct) t;
+ String topic = topicAndEpochs.getString(TOPIC);
+ for (Object e : topicAndEpochs.getArray(PARTITIONS)) {
+ Struct partitionAndEpoch = (Struct) e;
+ Errors error = Errors.forCode(partitionAndEpoch.getShort(ERROR_CODE));
+ int partitionId = partitionAndEpoch.getInt(PARTITION_ID);
+ TopicPartition tp = new TopicPartition(topic, partitionId);
+ long endOffset = partitionAndEpoch.getLong(END_OFFSET);
+ epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, endOffset));
+ }
+ }
+ }
+
+ public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic) {
+ this.epochEndOffsetsByPartition = epochsByTopic;
+ }
+
+ public Map<TopicPartition, EpochEndOffset> responses() {
+ return epochEndOffsetsByPartition;
+ }
+
+ public static OffsetsForLeaderEpochResponse parse(ByteBuffer buffer, short versionId) {
+ return new OffsetsForLeaderEpochResponse(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(versionId).read(buffer));
+ }
+
+ @Override
+ protected Struct toStruct(short version) {
+ Struct struct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.responseSchema(version));
+
+ //Group by topic
+ Map<String, List<PartitionEndOffset>> topicsToPartitionEndOffsets = new HashMap<>();
+ for (TopicPartition tp : epochEndOffsetsByPartition.keySet()) {
+ List<PartitionEndOffset> partitionEndOffsets = topicsToPartitionEndOffsets.get(tp.topic());
+ if (partitionEndOffsets == null)
+ partitionEndOffsets = new ArrayList<>();
+ partitionEndOffsets.add(new PartitionEndOffset(tp.partition(), epochEndOffsetsByPartition.get(tp)));
+ topicsToPartitionEndOffsets.put(tp.topic(), partitionEndOffsets);
+ }
+
+ //Write struct
+ List<Struct> topics = new ArrayList<>(topicsToPartitionEndOffsets.size());
+ for (Map.Entry<String, List<PartitionEndOffset>> topicEpochs : topicsToPartitionEndOffsets.entrySet()) {
+ Struct partition = struct.instance(TOPICS);
+ String topic = topicEpochs.getKey();
+ partition.set(TOPIC, topic);
+ List<PartitionEndOffset> paritionEpochs = topicEpochs.getValue();
+ List<Struct> paritions = new ArrayList<>(paritionEpochs.size());
+ for (PartitionEndOffset peo : paritionEpochs) {
+ Struct partitionRow = partition.instance(PARTITIONS);
+ partitionRow.set(ERROR_CODE, peo.epochEndOffset.error().code());
+ partitionRow.set(PARTITION_ID, peo.partition);
+ partitionRow.set(END_OFFSET, peo.epochEndOffset.endOffset());
+ paritions.add(partitionRow);
+ }
+
+ partition.set(PARTITIONS, paritions.toArray());
+ topics.add(partition);
+ }
+ struct.set(TOPICS, topics.toArray());
+ return struct;
+ }
+
+ private class PartitionEndOffset {
+ private int partition;
+ private EpochEndOffset epochEndOffset;
+
+ PartitionEndOffset(int partition, EpochEndOffset epochEndOffset) {
+ this.partition = partition;
+ this.epochEndOffset = epochEndOffset;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ PartitionEndOffset that = (PartitionEndOffset) o;
+
+ if (partition != that.partition) return false;
+ return epochEndOffset != null ? epochEndOffset.equals(that.epochEndOffset) : that.epochEndOffset == null;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = partition;
+ result = 31 * result + (epochEndOffset != null ? epochEndOffset.hashCode() : 0);
+ return result;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 8a7633e..2995882 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -163,6 +163,9 @@ public class RequestResponseTest {
checkRequest(createListOffsetRequest(0));
checkErrorResponse(createListOffsetRequest(0), new UnknownServerException());
checkResponse(createListOffsetResponse(0), 0);
+ checkRequest(createLeaderEpochRequest());
+ checkResponse(createLeaderEpochResponse(), 0);
+ checkErrorResponse(createLeaderEpochRequest(), new UnknownServerException());
}
@Test
@@ -798,6 +801,26 @@ public class RequestResponseTest {
return new InitPidResponse(Errors.NONE, 3332, (short) 3);
}
+ private OffsetsForLeaderEpochRequest createLeaderEpochRequest() {
+ Map<TopicPartition, Integer> epochs = new HashMap<>();
+
+ epochs.put(new TopicPartition("topic1", 0), 1);
+ epochs.put(new TopicPartition("topic1", 1), 1);
+ epochs.put(new TopicPartition("topic2", 2), 3);
+
+ return new OffsetsForLeaderEpochRequest.Builder(epochs).build();
+ }
+
+ private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
+ Map<TopicPartition, EpochEndOffset> epochs = new HashMap<>();
+
+ epochs.put(new TopicPartition("topic1", 0), new EpochEndOffset(Errors.NONE, 0));
+ epochs.put(new TopicPartition("topic1", 1), new EpochEndOffset(Errors.NONE, 1));
+ epochs.put(new TopicPartition("topic2", 2), new EpochEndOffset(Errors.NONE, 2));
+
+ return new OffsetsForLeaderEpochResponse(epochs);
+ }
+
private static class ByteBufferChannel implements GatheringByteChannel {
private final ByteBuffer buf;
private boolean closed = false;
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
index 9704206..99551f7 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockDeserializer.java
@@ -25,14 +25,21 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class MockDeserializer implements ClusterResourceListener, Deserializer<byte[]> {
- public static final AtomicInteger INIT_COUNT = new AtomicInteger(0);
- public static final AtomicInteger CLOSE_COUNT = new AtomicInteger(0);
- public static final AtomicReference<ClusterResource> CLUSTER_META = new AtomicReference<>();
- public static final ClusterResource NO_CLUSTER_ID = new ClusterResource("no_cluster_id");
- public static final AtomicReference<ClusterResource> CLUSTER_ID_BEFORE_DESERIALIZE = new AtomicReference<>(NO_CLUSTER_ID);
+ public static AtomicInteger initCount = new AtomicInteger(0);
+ public static AtomicInteger closeCount = new AtomicInteger(0);
+ public static AtomicReference<ClusterResource> clusterMeta = new AtomicReference<>();
+ public static ClusterResource noClusterId = new ClusterResource("no_cluster_id");
+ public static AtomicReference<ClusterResource> clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
+
+ public static void resetStaticVariables() {
+ initCount = new AtomicInteger(0);
+ closeCount = new AtomicInteger(0);
+ clusterMeta = new AtomicReference<>();
+ clusterIdBeforeDeserialize = new AtomicReference<>(noClusterId);
+ }
public MockDeserializer() {
- INIT_COUNT.incrementAndGet();
+ initCount.incrementAndGet();
}
@Override
@@ -43,17 +50,17 @@ public class MockDeserializer implements ClusterResourceListener, Deserializer<b
public byte[] deserialize(String topic, byte[] data) {
// This will ensure that we get the cluster metadata when deserialize is called for the first time
// as subsequent compareAndSet operations will fail.
- CLUSTER_ID_BEFORE_DESERIALIZE.compareAndSet(NO_CLUSTER_ID, CLUSTER_META.get());
+ clusterIdBeforeDeserialize.compareAndSet(noClusterId, clusterMeta.get());
return data;
}
@Override
public void close() {
- CLOSE_COUNT.incrementAndGet();
+ closeCount.incrementAndGet();
}
@Override
public void onUpdate(ClusterResource clusterResource) {
- CLUSTER_META.set(clusterResource);
+ clusterMeta.set(clusterResource);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/config/log4j.properties
----------------------------------------------------------------------
diff --git a/config/log4j.properties b/config/log4j.properties
index bf816e7..bdee182 100644
--- a/config/log4j.properties
+++ b/config/log4j.properties
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=INFO, stdout
+log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 6101d2a..051c5d6 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -67,7 +67,10 @@ object ApiVersion {
"0.11.0-IV0" -> KAFKA_0_11_0_IV0,
// introduced DeleteRecordsRequest v0 and FetchRequest v4 in KIP-107
"0.11.0-IV1" -> KAFKA_0_11_0_IV1,
- "0.11.0" -> KAFKA_0_11_0_IV1
+ "0.11.0" -> KAFKA_0_11_0_IV1,
+ // Introduce leader epoch fetches to the replica fetcher via KIP-101
+ "0.11.0-IV2" -> KAFKA_0_11_0_IV2,
+ "0.11.0" -> KAFKA_0_11_0_IV2
)
private val versionPattern = "\\.".r
@@ -159,7 +162,13 @@ case object KAFKA_0_11_0_IV0 extends ApiVersion {
}
case object KAFKA_0_11_0_IV1 extends ApiVersion {
- val version: String = "0.11.0-IV0"
+ val version: String = "0.11.0-IV1"
val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
- val id: Int = 10
+ val id: Int = 11
+}
+
+case object KAFKA_0_11_0_IV2 extends ApiVersion {
+ val version: String = "0.11.0-IV2"
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val id: Int = 12
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ddb2411..4dd96c3 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -163,12 +163,21 @@ class Partition(val topic: String,
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
// add replicas that are new
- allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
// remove assigned replicas that have been removed by the controller
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
+
+ info(s"$topicPartition starts at Leader Epoch ${partitionStateInfo.leaderEpoch} from offset ${getReplica().get.logEndOffset.messageOffset}. Previous Leader Epoch was: $leaderEpoch")
+
+ //We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
leaderEpoch = partitionStateInfo.leaderEpoch
+ allReplicas.map(id => getOrCreateReplica(id))
+ .filter(_.isLocal)
+ .foreach { replica =>
+ replica.epochs.get.cacheLatestEpoch(leaderEpoch)
+ }
+
zkVersion = partitionStateInfo.zkVersion
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index 3995f9e..a604b87 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -22,8 +22,8 @@ import kafka.utils.Logging
import kafka.server.{LogOffsetMetadata, LogReadResult}
import kafka.common.KafkaException
import org.apache.kafka.common.errors.OffsetOutOfRangeException
-
-
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
import org.apache.kafka.common.utils.Time
class Replica(val brokerId: Int,
@@ -58,6 +58,8 @@ class Replica(val brokerId: Int,
def lastCaughtUpTimeMs = _lastCaughtUpTimeMs
+ val epochs = log.map(_.leaderEpochCache)
+
/*
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
index c4b7ce6..8d712f4 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
@@ -28,6 +28,7 @@ import ConsumerFetcherThread._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.EpochEndOffset
class ConsumerFetcherThread(name: String,
val config: ConsumerConfig,
@@ -38,7 +39,8 @@ class ConsumerFetcherThread(name: String,
clientId = config.clientId,
sourceBroker = sourceBroker,
fetchBackOffMs = config.refreshLeaderBackoffMs,
- isInterruptible = true) {
+ isInterruptible = true,
+ includeLogTruncation = false) {
type REQ = FetchRequest
type PD = PartitionData
@@ -100,7 +102,7 @@ class ConsumerFetcherThread(name: String,
protected def buildFetchRequest(partitionMap: collection.Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = {
partitionMap.foreach { case ((topicPartition, partitionFetchState)) =>
- if (partitionFetchState.isActive)
+ if (partitionFetchState.isReadyForFetch)
fetchRequestBuilder.addFetch(topicPartition.topic, topicPartition.partition, partitionFetchState.fetchOffset, fetchSize)
}
@@ -111,6 +113,12 @@ class ConsumerFetcherThread(name: String,
simpleConsumer.fetch(fetchRequest.underlying).data.map { case (TopicAndPartition(t, p), value) =>
new TopicPartition(t, p) -> new PartitionData(value)
}
+
+ override def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int] = { Map() }
+
+ override def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = { Map() }
+
+ override def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long] = { Map() }
}
object ConsumerFetcherThread {
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a052a9e..999c6aa 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -37,6 +37,12 @@ import org.apache.kafka.common.utils.{Time, Utils}
import scala.collection.JavaConverters._
import scala.collection.{Seq, mutable}
+import com.yammer.metrics.core.Gauge
+import org.apache.kafka.common.utils.{Time, Utils}
+import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
+import kafka.server.checkpoints.{LeaderEpochCheckpointFile, LeaderEpochFile}
+import kafka.server.epoch.{LeaderEpochCache, LeaderEpochFileCache}
+import org.apache.kafka.common.TopicPartition
object LogAppendInfo {
val UnknownLogAppendInfo = LogAppendInfo(-1, -1, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP,
@@ -136,15 +142,24 @@ class Log(@volatile var dir: File,
/* the actual segments of the log */
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+ val leaderEpochCache: LeaderEpochCache = initializeLeaderEpochCache()
+
locally {
val startMs = time.milliseconds
loadSegments()
+
/* Calculate the offset of the next message */
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset,
activeSegment.size.toInt)
+ leaderEpochCache.clearLatest(nextOffsetMetadata.messageOffset)
+
logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+
+ // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
+ leaderEpochCache.clearEarliest(logStartOffset)
+
buildAndRecoverPidMap(logEndOffset)
info("Completed load of log %s with %d log segments, log start offset %d and log end offset %d in %d ms"
@@ -193,10 +208,15 @@ class Log(@volatile var dir: File,
/** The name of this log */
def name = dir.getName()
- /* Load the log segments from the log files on disk */
- private def loadSegments() {
+ private def initializeLeaderEpochCache(): LeaderEpochCache = {
// create the log directory if it doesn't exist
dir.mkdirs()
+ new LeaderEpochFileCache(topicPartition, () => logEndOffsetMetadata,
+ new LeaderEpochCheckpointFile(LeaderEpochFile.newFile(dir)))
+ }
+
+ /* Load the log segments from the log files on disk */
+ private def loadSegments() {
var swapFiles = Set[File]()
// first do a pass through the files in the log directory and remove any temporary files
@@ -341,7 +361,7 @@ class Log(@volatile var dir: File,
info("Recovering unflushed segment %d in log %s.".format(curr.baseOffset, name))
val truncatedBytes =
try {
- curr.recover(config.maxMessageSize)
+ curr.recover(config.maxMessageSize, Some(leaderEpochCache))
} catch {
case _: InvalidOffsetException =>
val startOffset = curr.baseOffset
@@ -352,7 +372,7 @@ class Log(@volatile var dir: File,
if(truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
warn("Corruption found in segment %d of log %s, truncating to offset %d.".format(curr.baseOffset, name, curr.nextOffset))
- unflushed.foreach(deleteSegment)
+ unflushed.foreach(deleteSegment(_))
}
}
}
@@ -427,11 +447,11 @@ class Log(@volatile var dir: File,
*
* @param records The log records to append
* @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
+ * @param leaderEpochCache Optional cache of Leader Epoch Offsets.
* @throws KafkaStorageException If the append fails due to an I/O error.
* @return Information about the appended messages including the first and last offset.
*/
- def append(records: MemoryRecords, assignOffsets: Boolean = true): LogAppendInfo = {
-
+ def append(records: MemoryRecords, assignOffsets: Boolean = true, leaderEpochCache: LeaderEpochCache = leaderEpochCache): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records, isFromClient = assignOffsets)
// return if we have no valid messages or if this is a duplicate of the last appended entry
@@ -451,6 +471,7 @@ class Log(@volatile var dir: File,
appendInfo.firstOffset = offset.value
val now = time.milliseconds
val validateAndOffsetAssignResult = try {
+ leaderEpochCache.maybeAssignLatestCachedEpochToLeo()
LogValidator.validateMessagesAndAssignOffsets(validRecords,
offset,
now,
@@ -459,7 +480,8 @@ class Log(@volatile var dir: File,
config.compact,
config.messageFormatVersion.messageFormatVersion,
config.messageTimestampType,
- config.messageTimestampDifferenceMaxMs)
+ config.messageTimestampDifferenceMaxMs,
+ leaderEpochCache.latestUsedEpoch())
} catch {
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
@@ -485,6 +507,12 @@ class Log(@volatile var dir: File,
}
}
} else {
+ //Update the epoch cache with the epoch stamped by the leader
+ records.batches().asScala.map { batch =>
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+ leaderEpochCache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+ }
+
// we are taking the offsets we are given
if (!appendInfo.offsetsMonotonic || appendInfo.firstOffset < nextOffsetMetadata.messageOffset)
throw new IllegalArgumentException("Out of order offsets found in " + records.records.asScala.map(_.offset))
@@ -808,9 +836,12 @@ class Log(@volatile var dir: File,
// we must always have at least one segment, so if we are going to delete all the segments, create a new one first
if (segments.size == numToDelete)
roll()
- // remove the segments for lookups
- deletable.foreach(deleteSegment)
- logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+ lock synchronized {
+ // remove the segments for lookups
+ deletable.foreach(deleteSegment(_))
+ logStartOffset = math.max(logStartOffset, segments.firstEntry().getValue.baseOffset)
+ leaderEpochCache.clearEarliest(logStartOffset)
+ }
}
numToDelete
}
@@ -1017,6 +1048,7 @@ class Log(@volatile var dir: File,
lock synchronized {
logSegments.foreach(_.delete())
segments.clear()
+ leaderEpochCache.clear()
Utils.delete(dir)
}
}
@@ -1027,23 +1059,24 @@ class Log(@volatile var dir: File,
* @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
*/
private[log] def truncateTo(targetOffset: Long) {
- info("Truncating log %s to offset %d.".format(name, targetOffset))
if(targetOffset < 0)
throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
- if(targetOffset > logEndOffset) {
+ if(targetOffset >= logEndOffset) {
info("Truncating %s to %d has no effect as the largest offset in the log is %d.".format(name, targetOffset, logEndOffset-1))
return
}
+ info("Truncating log %s to offset %d.".format(name, targetOffset))
lock synchronized {
if(segments.firstEntry.getValue.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
val deletable = logSegments.filter(segment => segment.baseOffset > targetOffset)
- deletable.foreach(deleteSegment)
+ deletable.foreach(deleteSegment(_))
activeSegment.truncateTo(targetOffset)
updateLogEndOffset(targetOffset)
this.recoveryPoint = math.min(targetOffset, this.recoveryPoint)
this.logStartOffset = math.min(targetOffset, this.logStartOffset)
+ leaderEpochCache.clearLatest(targetOffset)
}
buildAndRecoverPidMap(targetOffset)
}
@@ -1058,7 +1091,7 @@ class Log(@volatile var dir: File,
debug("Truncate and start log '" + name + "' to " + newOffset)
lock synchronized {
val segmentsToDelete = logSegments.toList
- segmentsToDelete.foreach(deleteSegment)
+ segmentsToDelete.foreach(deleteSegment(_))
addSegment(new LogSegment(dir,
newOffset,
indexIntervalBytes = config.indexInterval,
@@ -1069,6 +1102,7 @@ class Log(@volatile var dir: File,
initFileSize = initFileSize,
preallocate = config.preallocate))
updateLogEndOffset(newOffset)
+ leaderEpochCache.clear()
this.recoveryPoint = math.min(newOffset, this.recoveryPoint)
this.logStartOffset = newOffset
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 01c4df4..2b4d956 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.locks.ReentrantLock
import com.yammer.metrics.core.Gauge
import kafka.common.LogCleaningAbortedException
import kafka.metrics.KafkaMetricsGroup
-import kafka.server.OffsetCheckpoint
+import kafka.server.checkpoints.{OffsetCheckpoint, OffsetCheckpointFile}
import kafka.utils.CoreUtils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.TopicPartition
@@ -55,7 +55,7 @@ private[log] class LogCleanerManager(val logDirs: Array[File], val logs: Pool[To
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
/* the offset checkpoints holding the last cleaned point for each log */
- private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, offsetCheckpointFile)))).toMap
+ private val checkpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, offsetCheckpointFile)))).toMap
/* the set of logs currently being cleaned */
private val inProgress = mutable.HashMap[TopicPartition, LogCleaningState]()
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ec164e2..469c46b 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,20 +18,19 @@
package kafka.log
import java.io._
-import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
-
-import kafka.utils._
-
-import scala.collection._
-import scala.collection.JavaConverters._
-import kafka.common.{KafkaException, KafkaStorageException}
-import kafka.server._
-import java.util.concurrent.{ExecutionException, ExecutorService, Executors, Future}
+import java.util.concurrent._
import kafka.admin.AdminUtils
+import kafka.common.{KafkaException, KafkaStorageException}
+import kafka.server.checkpoints.OffsetCheckpointFile
+import kafka.server.{BrokerState, RecoveringFromUncleanShutdown, _}
+import kafka.utils._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Time
+import scala.collection.JavaConverters._
+import scala.collection._
+
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
* All read and write operations are delegated to the individual log instances.
@@ -67,8 +66,8 @@ class LogManager(val logDirs: Array[File],
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
- private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
- private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, LogStartOffsetCheckpointFile)))).toMap
+ private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, RecoveryPointCheckpointFile)))).toMap
+ private val logStartOffsetCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpointFile(new File(dir, LogStartOffsetCheckpointFile)))).toMap
loadLogs()
// public, so we can access this from kafka.admin.DeleteTopicTest
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 4e77625..b77be34 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -21,6 +21,7 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
+import kafka.server.epoch.LeaderEpochCache
import kafka.server.{FetchDataInfo, LogOffsetMetadata}
import kafka.utils._
import org.apache.kafka.common.errors.CorruptRecordException
@@ -213,10 +214,11 @@ class LogSegment(val log: FileRecords,
*
* @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
* is corrupt.
+ * @param leaderEpochCache Optionally a cache for updating the leader epoch during recovery.
* @return The number of bytes truncated from the log
*/
@nonthreadsafe
- def recover(maxMessageSize: Int): Int = {
+ def recover(maxMessageSize: Int, leaderEpochCache: Option[LeaderEpochCache] = None): Int = {
index.truncate()
index.resize(index.maxIndexSize)
timeIndex.truncate()
@@ -242,6 +244,13 @@ class LogSegment(val log: FileRecords,
lastIndexEntry = validBytes
}
validBytes += batch.sizeInBytes()
+
+ if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
+ leaderEpochCache.foreach { cache =>
+ if (batch.partitionLeaderEpoch > cache.latestUsedEpoch()) // this is to avoid unnecessary warning in cache.assign()
+ cache.assign(batch.partitionLeaderEpoch, batch.baseOffset())
+ }
+ }
}
} catch {
case e: CorruptRecordException =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index c01a5de..fa520ad 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -51,20 +51,21 @@ private[kafka] object LogValidator extends Logging {
compactedTopic: Boolean = false,
messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
messageTimestampType: TimestampType,
- messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+ messageTimestampDiffMaxMs: Long,
+ partitionLeaderEpoch: Int = RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH): ValidationAndOffsetAssignResult = {
if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
// check the magic value
if (!records.hasMatchingMagic(messageFormatVersion))
convertAndAssignOffsetsNonCompressed(records, offsetCounter, compactedTopic, now, messageTimestampType,
- messageTimestampDiffMaxMs, messageFormatVersion)
+ messageTimestampDiffMaxMs, messageFormatVersion, partitionLeaderEpoch)
else
// Do in-place validation, offset assignment and maybe set timestamp
assignOffsetsNonCompressed(records, offsetCounter, now, compactedTopic, messageTimestampType,
- messageTimestampDiffMaxMs)
+ messageTimestampDiffMaxMs, partitionLeaderEpoch)
} else {
validateMessagesAndAssignOffsetsCompressed(records, offsetCounter, now, sourceCodec, targetCodec, compactedTopic,
- messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs)
+ messageFormatVersion, messageTimestampType, messageTimestampDiffMaxMs, partitionLeaderEpoch)
}
}
@@ -74,7 +75,8 @@ private[kafka] object LogValidator extends Logging {
now: Long,
timestampType: TimestampType,
messageTimestampDiffMaxMs: Long,
- toMagicValue: Byte): ValidationAndOffsetAssignResult = {
+ toMagicValue: Byte,
+ partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
val sizeInBytesAfterConversion = AbstractRecords.estimateSizeInBytes(toMagicValue, offsetCounter.value,
CompressionType.NONE, records.records)
@@ -85,7 +87,7 @@ private[kafka] object LogValidator extends Logging {
val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
val builder = MemoryRecords.builder(newBuffer, toMagicValue, CompressionType.NONE, timestampType,
- offsetCounter.value, now, pid, epoch, sequence)
+ offsetCounter.value, now, pid, epoch, sequence, partitionLeaderEpoch)
for (batch <- records.batches.asScala) {
ensureNonTransactional(batch)
@@ -112,7 +114,8 @@ private[kafka] object LogValidator extends Logging {
currentTimestamp: Long,
compactedTopic: Boolean,
timestampType: TimestampType,
- timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+ timestampDiffMaxMs: Long,
+ partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
var maxTimestamp = RecordBatch.NO_TIMESTAMP
var offsetOfMaxTimestamp = -1L
val initialOffset = offsetCounter.value
@@ -138,6 +141,9 @@ private[kafka] object LogValidator extends Logging {
batch.setLastOffset(offsetCounter.value - 1)
+ if(batch.magic >= RecordBatch.MAGIC_VALUE_V2)
+ batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
// TODO: in the compressed path, we ensure that the batch max timestamp is correct.
// We should either do the same or (better) let those two paths converge.
if (batch.magic > RecordBatch.MAGIC_VALUE_V0 && timestampType == TimestampType.LOG_APPEND_TIME)
@@ -171,7 +177,8 @@ private[kafka] object LogValidator extends Logging {
compactedTopic: Boolean = false,
messageFormatVersion: Byte = RecordBatch.CURRENT_MAGIC_VALUE,
messageTimestampType: TimestampType,
- messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
+ messageTimestampDiffMaxMs: Long,
+ partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
// No in place assignment situation 1 and 2
var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > RecordBatch.MAGIC_VALUE_V0
@@ -223,7 +230,7 @@ private[kafka] object LogValidator extends Logging {
(first.producerId, first.producerEpoch, first.baseSequence)
}
buildRecordsAndAssignOffsets(messageFormatVersion, offsetCounter, messageTimestampType,
- CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence)
+ CompressionType.forId(targetCodec.codec), currentTimestamp, validatedRecords, pid, epoch, sequence, partitionLeaderEpoch)
} else {
// we can update the batch only and write the compressed payload as is
val batch = records.batches.iterator.next()
@@ -237,6 +244,9 @@ private[kafka] object LogValidator extends Logging {
if (messageFormatVersion >= RecordBatch.MAGIC_VALUE_V1)
batch.setMaxTimestamp(messageTimestampType, maxTimestamp)
+ if(messageFormatVersion >= RecordBatch.MAGIC_VALUE_V2)
+ batch.setPartitionLeaderEpoch(partitionLeaderEpoch)
+
ValidationAndOffsetAssignResult(validatedRecords = records,
maxTimestamp = maxTimestamp,
shallowOffsetOfMaxTimestamp = lastOffset,
@@ -247,11 +257,11 @@ private[kafka] object LogValidator extends Logging {
private def buildRecordsAndAssignOffsets(magic: Byte, offsetCounter: LongRef, timestampType: TimestampType,
compressionType: CompressionType, logAppendTime: Long,
validatedRecords: Seq[Record],
- producerId: Long, epoch: Short, baseSequence: Int): ValidationAndOffsetAssignResult = {
+ producerId: Long, epoch: Short, baseSequence: Int, partitionLeaderEpoch: Int): ValidationAndOffsetAssignResult = {
val estimatedSize = AbstractRecords.estimateSizeInBytes(magic, offsetCounter.value, compressionType, validatedRecords.asJava)
val buffer = ByteBuffer.allocate(estimatedSize)
val builder = MemoryRecords.builder(buffer, magic, compressionType, timestampType, offsetCounter.value,
- logAppendTime, producerId, epoch, baseSequence)
+ logAppendTime, producerId, epoch, baseSequence, partitionLeaderEpoch)
validatedRecords.foreach { record =>
builder.appendWithOffset(offsetCounter.getAndIncrement(), record)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 14e56bd..734c006 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -38,6 +38,7 @@ import com.yammer.metrics.core.Gauge
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.{FatalExitError, PartitionStates}
import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.EpochEndOffset
/**
* Abstract class for fetching data from multiple partitions from the same broker.
@@ -46,13 +47,15 @@ abstract class AbstractFetcherThread(name: String,
clientId: String,
val sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
- isInterruptible: Boolean = true)
+ isInterruptible: Boolean = true,
+ includeLogTruncation: Boolean
+ )
extends ShutdownableThread(name, isInterruptible) {
type REQ <: FetchRequest
type PD <: PartitionData
- private val partitionStates = new PartitionStates[PartitionFetchState]
+ private[server] val partitionStates = new PartitionStates[PartitionFetchState]
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
@@ -71,6 +74,12 @@ abstract class AbstractFetcherThread(name: String,
// deal with partitions with errors, potentially due to leadership changes
protected def handlePartitionsWithErrors(partitions: Iterable[TopicPartition])
+ protected def buildLeaderEpochRequest(allPartitions: Seq[(TopicPartition, PartitionFetchState)]): Map[TopicPartition, Int]
+
+ protected def fetchEpochsFromLeader(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset]
+
+ protected def maybeTruncate(fetchedEpochs: Map[TopicPartition, EpochEndOffset]): Map[TopicPartition, Long]
+
protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): REQ
protected def fetch(fetchRequest: REQ): Seq[(TopicPartition, PD)]
@@ -87,12 +96,12 @@ abstract class AbstractFetcherThread(name: String,
fetcherLagStats.unregister()
}
- override def doWork() {
+ private def states() = partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }
+ override def doWork() {
+ maybeTruncate()
val fetchRequest = inLock(partitionMapLock) {
- val fetchRequest = buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>
- state.topicPartition -> state.value
- })
+ val fetchRequest = buildFetchRequest(states)
if (fetchRequest.isEmpty) {
trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
@@ -103,6 +112,30 @@ abstract class AbstractFetcherThread(name: String,
processFetchRequest(fetchRequest)
}
+ /**
+ * - Build a leader epoch fetch based on partitions that are in the Truncating phase
+ * - Issue LeaderEpochRequeust, retrieving the latest offset for each partition's
+ * leader epoch. This is the offset the follower should truncate to ensure
+ * accurate log replication.
+ * - Finally truncate the logs for partitions in the truncating phase and mark them
+ * truncation complete. Do this within a lock to ensure no leadership changes can
+ * occur during truncation.
+ */
+ def maybeTruncate(): Unit = {
+ val epochRequests = inLock(partitionMapLock) { buildLeaderEpochRequest(states) }
+
+ if (!epochRequests.isEmpty) {
+ val fetchedEpochs = fetchEpochsFromLeader(epochRequests)
+ //Ensure we hold a lock during truncation.
+ inLock(partitionMapLock) {
+ //Check no leadership changes happened whilst we were unlocked, fetching epochs
+ val leaderEpochs = fetchedEpochs.filter { case (tp, _) => partitionStates.contains(tp) }
+ val truncationPoints = maybeTruncate(leaderEpochs)
+ markTruncationComplete(truncationPoints)
+ }
+ }
+ }
+
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = mutable.Set[TopicPartition]()
@@ -208,25 +241,41 @@ abstract class AbstractFetcherThread(name: String,
!partitionStates.contains(tp)
}.map { case (tp, offset) =>
val fetchState =
- if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(tp))
- else new PartitionFetchState(offset)
+ if (PartitionTopicInfo.isOffsetInvalid(offset))
+ new PartitionFetchState(handleOffsetOutOfRange(tp), includeLogTruncation)
+ else
+ new PartitionFetchState(offset, includeLogTruncation)
tp -> fetchState
}
- val existingPartitionToState = partitionStates.partitionStates.asScala.map { state =>
- state.topicPartition -> state.value
- }.toMap
+ val existingPartitionToState = states().toMap
partitionStates.set((existingPartitionToState ++ newPartitionToState).asJava)
partitionMapCond.signalAll()
} finally partitionMapLock.unlock()
}
+ /**
+ * Loop through all partitions, marking them as truncation complete and applying the correct offset
+ * @param partitions the partitions to mark truncation complete
+ */
+ private def markTruncationComplete(partitions: Map[TopicPartition, Long]) {
+ val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStates.asScala
+ .map { state =>
+ val maybeTruncationComplete = partitions.get(state.topicPartition()) match {
+ case Some(offset) => new PartitionFetchState(offset, state.value.delay, truncatingLog = false)
+ case None => state.value()
+ }
+ (state.topicPartition(), maybeTruncationComplete)
+ }.toMap
+ partitionStates.set(newStates.asJava)
+ }
+
def delayPartitions(partitions: Iterable[TopicPartition], delay: Long) {
partitionMapLock.lockInterruptibly()
try {
for (partition <- partitions) {
Option(partitionStates.stateValue(partition)).foreach (currentPartitionFetchState =>
- if (currentPartitionFetchState.isActive)
- partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay)))
+ if (!currentPartitionFetchState.isDelayed)
+ partitionStates.updateAndMoveToEnd(partition, new PartitionFetchState(currentPartitionFetchState.fetchOffset, new DelayedItem(delay), currentPartitionFetchState.truncatingLog))
)
}
partitionMapCond.signalAll()
@@ -348,13 +397,25 @@ case class ClientIdTopicPartition(clientId: String, topic: String, partitionId:
}
/**
- * case class to keep partition offset and its state(active, inactive)
+ * case class to keep partition offset and its state(truncatingLog, delayed)
+ * This represents a partition as being either:
+ * (1) Truncating its log, for example having recently become a follower
+ * (2) Delayed, for example due to an error, where we subsequently back off a bit
+ * (3) ReadyForFetch, the is the active state where the thread is actively fetching data.
*/
-case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem) {
+case class PartitionFetchState(fetchOffset: Long, delay: DelayedItem, truncatingLog: Boolean = false) {
+
+ def this(offset: Long, truncatingLog: Boolean) = this(offset, new DelayedItem(0), truncatingLog)
+
+ def this(offset: Long, delay: DelayedItem) = this(offset, new DelayedItem(0), false)
def this(fetchOffset: Long) = this(fetchOffset, new DelayedItem(0))
- def isActive: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0
+ def isReadyForFetch: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0 && !truncatingLog
+
+ def isTruncatingLog: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) == 0 && truncatingLog
+
+ def isDelayed: Boolean = delay.getDelay(TimeUnit.MILLISECONDS) > 0
- override def toString = "%d-%b".format(fetchOffset, isActive)
+ override def toString = "offset:%d-isReadyForFetch:%b-isTruncatingLog:%b".format(fetchOffset, isReadyForFetch, truncatingLog)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 600b84d..1e8900b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -103,6 +103,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.DELETE_TOPICS => handleDeleteTopicsRequest(request)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitPidRequest(request)
+ case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case requestId => throw new KafkaException("Unknown api code " + requestId)
}
} catch {
@@ -1322,6 +1323,17 @@ class KafkaApis(val requestChannel: RequestChannel,
txnCoordinator.handleInitPid(initPidRequest.transactionalId, sendResponseCallback)
}
+ def handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit = {
+ val offsetForEpoch = request.body[OffsetsForLeaderEpochRequest]
+ val requestInfo = offsetForEpoch.epochsByTopicPartition()
+ authorizeClusterAction(request)
+
+ val responseBody = new OffsetsForLeaderEpochResponse(
+ replicaManager.getResponseFor(requestInfo)
+ )
+ requestChannel.sendResponse(new RequestChannel.Response(request, responseBody))
+ }
+
def authorizeClusterAction(request: RequestChannel.Request): Unit = {
if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
throw new ClusterAuthorizationException(s"Request $request is not authorized.")
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala b/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
deleted file mode 100644
index de2626c..0000000
--- a/core/src/main/scala/kafka/server/OffsetCheckpoint.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.nio.file.{FileSystems, Paths}
-import java.util.regex.Pattern
-
-import org.apache.kafka.common.utils.Utils
-
-import scala.collection._
-import kafka.utils.{Exit, Logging}
-import kafka.common._
-import java.io._
-import java.nio.charset.StandardCharsets
-
-import org.apache.kafka.common.TopicPartition
-
-object OffsetCheckpoint {
- private val WhiteSpacesPattern = Pattern.compile("\\s+")
- private val CurrentVersion = 0
-}
-
-/**
- * This class saves out a map of topic/partition=>offsets to a file
- */
-class OffsetCheckpoint(val file: File) extends Logging {
- import OffsetCheckpoint._
- private val path = file.toPath.toAbsolutePath
- private val tempPath = Paths.get(path.toString + ".tmp")
- private val lock = new Object()
- file.createNewFile() // in case the file doesn't exist
-
- def write(offsets: Map[TopicPartition, Long]) {
- lock synchronized {
- // write to temp file and then swap with the existing file
- val fileOutputStream = new FileOutputStream(tempPath.toFile)
- val writer = new BufferedWriter(new OutputStreamWriter(fileOutputStream, StandardCharsets.UTF_8))
- try {
- writer.write(CurrentVersion.toString)
- writer.newLine()
-
- writer.write(offsets.size.toString)
- writer.newLine()
-
- offsets.foreach { case (topicPart, offset) =>
- writer.write(s"${topicPart.topic} ${topicPart.partition} $offset")
- writer.newLine()
- }
-
- writer.flush()
- fileOutputStream.getFD().sync()
- } catch {
- case e: FileNotFoundException =>
- if (FileSystems.getDefault.isReadOnly) {
- fatal("Halting writes to offset checkpoint file because the underlying file system is inaccessible : ", e)
- Exit.halt(1)
- }
- throw e
- } finally {
- writer.close()
- }
-
- Utils.atomicMoveWithFallback(tempPath, path)
- }
- }
-
- def read(): Map[TopicPartition, Long] = {
-
- def malformedLineException(line: String) =
- new IOException(s"Malformed line in offset checkpoint file: $line'")
-
- lock synchronized {
- val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8))
- var line: String = null
- try {
- line = reader.readLine()
- if (line == null)
- return Map.empty
- val version = line.toInt
- version match {
- case CurrentVersion =>
- line = reader.readLine()
- if (line == null)
- return Map.empty
- val expectedSize = line.toInt
- val offsets = mutable.Map[TopicPartition, Long]()
- line = reader.readLine()
- while (line != null) {
- WhiteSpacesPattern.split(line) match {
- case Array(topic, partition, offset) =>
- offsets += new TopicPartition(topic, partition.toInt) -> offset.toLong
- line = reader.readLine()
- case _ => throw malformedLineException(line)
- }
- }
- if (offsets.size != expectedSize)
- throw new IOException(s"Expected $expectedSize entries but found only ${offsets.size}")
- offsets
- case _ =>
- throw new IOException("Unrecognized version of the highwatermark checkpoint file: " + version)
- }
- } catch {
- case _: NumberFormatException => throw malformedLineException(line)
- } finally {
- reader.close()
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/0baea2ac/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
new file mode 100644
index 0000000..8ba3f60
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.net.SocketTimeoutException
+
+import kafka.cluster.BrokerEndPoint
+import org.apache.kafka.clients._
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.security.JaasContext
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.clients.{ApiVersions, ClientResponse, ManualMetadataUpdater, NetworkClient}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.requests.AbstractRequest.Builder
+
+import scala.collection.JavaConverters._
+
+trait BlockingSend {
+
+ def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse
+
+ def close()
+}
+
+class ReplicaFetcherBlockingSend(sourceBroker: BrokerEndPoint,
+ brokerConfig: KafkaConfig,
+ metrics: Metrics,
+ time: Time,
+ fetcherId: Int,
+ clientId: String) extends BlockingSend {
+
+ private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
+ private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs
+
+ private val networkClient = {
+ val channelBuilder = ChannelBuilders.clientChannelBuilder(
+ brokerConfig.interBrokerSecurityProtocol,
+ JaasContext.Type.SERVER,
+ brokerConfig,
+ brokerConfig.interBrokerListenerName,
+ brokerConfig.saslMechanismInterBrokerProtocol,
+ brokerConfig.saslInterBrokerHandshakeRequestEnable
+ )
+ val selector = new Selector(
+ NetworkReceive.UNLIMITED,
+ brokerConfig.connectionsMaxIdleMs,
+ metrics,
+ time,
+ "replica-fetcher",
+ Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava,
+ false,
+ channelBuilder
+ )
+ new NetworkClient(
+ selector,
+ new ManualMetadataUpdater(),
+ clientId,
+ 1,
+ 0,
+ Selectable.USE_DEFAULT_BUFFER_SIZE,
+ brokerConfig.replicaSocketReceiveBufferBytes,
+ brokerConfig.requestTimeoutMs,
+ time,
+ false,
+ new ApiVersions
+ )
+ }
+
+ override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
+ try {
+ if (!NetworkClientUtils.awaitReady(networkClient, sourceNode, time, socketTimeout))
+ throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms")
+ else {
+ val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder,
+ time.milliseconds(), true)
+ NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
+ }
+ }
+ catch {
+ case e: Throwable =>
+ networkClient.close(sourceBroker.id.toString)
+ throw e
+ }
+ }
+
+ def close(): Unit = {
+ networkClient.close()
+ }
+}
\ No newline at end of file