You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/25 04:33:34 UTC

[kafka] branch 2.3 updated: KAFKA-8422; Client should send OffsetForLeaderEpoch only if broker supports latest version (#6806)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 60fcd43  KAFKA-8422; Client should send OffsetForLeaderEpoch only if broker supports latest version (#6806)
60fcd43 is described below

commit 60fcd43d4e260113d023de74904c74d7fda1a6a4
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Fri May 24 21:29:51 2019 -0700

    KAFKA-8422; Client should send OffsetForLeaderEpoch only if broker supports latest version (#6806)
    
    In the olden days, OffsetForLeaderEpoch was exclusively an inter-broker protocol and
    required Cluster level permission. With KIP-320, clients can use this API as well and
    so we lowered the required permission to Topic Describe. The only way the client can
    be sure that the new permissions are in use is to require version 3 of the protocol
    which was bumped for 2.3. If the broker does not support this version, we skip the
    validation and revert to the old behavior.
    
    Additionally, this patch fixes a problem with the newly added replicaId field when
    parsed from older versions which did not have it. If the field was not present, then
    we used the consumer's sentinel value, but this would limit the range of visible
    offsets by the high watermark. To get around this problem, this patch adds a
    separate "debug" sentinel similar to APIs like Fetch and ListOffsets.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 .../kafka/clients/consumer/KafkaConsumer.java      |   6 +-
 .../kafka/clients/consumer/internals/Fetcher.java  |  39 +++++++-
 .../internals/OffsetsForLeaderEpochClient.java     |   3 +-
 .../consumer/internals/SubscriptionState.java      |   2 +-
 .../kafka/common/requests/ApiVersionsResponse.java |   4 +
 .../requests/OffsetsForLeaderEpochRequest.java     |  35 +++++--
 .../message/OffsetForLeaderEpochRequest.json       |   7 +-
 .../org/apache/kafka/clients/ApiVersionsTest.java  |   2 +-
 .../apache/kafka/clients/NetworkClientTest.java    |   7 +-
 .../apache/kafka/clients/NodeApiVersionsTest.java  |  10 +-
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   4 +-
 .../clients/consumer/internals/FetcherTest.java    |  56 ++++++++++-
 .../producer/internals/RecordAccumulatorTest.java  |   4 +-
 .../clients/producer/internals/SenderTest.java     |   4 +-
 .../apache/kafka/common/message/MessageTest.java   | 103 +++++++++++++++------
 .../requests/OffsetsForLeaderEpochRequestTest.java |  60 ++++++++++++
 .../kafka/common/requests/RequestResponseTest.java |  20 ++--
 .../kafka/api/AuthorizerIntegrationTest.scala      |   2 +-
 .../server/OffsetsForLeaderEpochRequestTest.scala  |   7 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   4 +-
 .../server/epoch/LeaderEpochIntegrationTest.scala  |   5 +-
 21 files changed, 302 insertions(+), 82 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index c33a52e..7f63e8b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -744,6 +744,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             Sensor throttleTimeSensor = Fetcher.throttleTimeSensor(metrics, metricsRegistry);
             int heartbeatIntervalMs = config.getInt(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG);
 
+            ApiVersions apiVersions = new ApiVersions();
             NetworkClient netClient = new NetworkClient(
                     new Selector(config.getLong(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, metricGrpPrefix, channelBuilder, logContext),
                     this.metadata,
@@ -757,7 +758,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
-                    new ApiVersions(),
+                    apiVersions,
                     throttleTimeSensor,
                     logContext);
             this.client = new ConsumerNetworkClient(
@@ -813,7 +814,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     this.time,
                     this.retryBackoffMs,
                     this.requestTimeoutMs,
-                    isolationLevel);
+                    isolationLevel,
+                    apiVersions);
 
             config.logUnused();
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 7b633d6..aa2936a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -16,10 +16,12 @@
  */
 package org.apache.kafka.clients.consumer.internals;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MetadataCache;
+import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.StaleMetadataException;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -49,6 +51,7 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Meter;
 import org.apache.kafka.common.metrics.stats.Min;
 import org.apache.kafka.common.metrics.stats.Value;
+import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.record.BufferSupplier;
 import org.apache.kafka.common.record.ControlRecordType;
@@ -57,6 +60,7 @@ import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.common.requests.ApiVersionsResponse;
 import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.IsolationLevel;
@@ -64,6 +68,7 @@ import org.apache.kafka.common.requests.ListOffsetRequest;
 import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.MetadataRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.utils.CloseableIterator;
 import org.apache.kafka.common.utils.LogContext;
@@ -144,6 +149,7 @@ public class Fetcher<K, V> implements Closeable {
     private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
     private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
     private final Set<Integer> nodesWithPendingFetchRequests;
+    private final ApiVersions apiVersions;
 
     private PartitionRecords nextInLineRecords = null;
 
@@ -165,7 +171,8 @@ public class Fetcher<K, V> implements Closeable {
                    Time time,
                    long retryBackoffMs,
                    long requestTimeoutMs,
-                   IsolationLevel isolationLevel) {
+                   IsolationLevel isolationLevel,
+                   ApiVersions apiVersions) {
         this.log = logContext.logger(Fetcher.class);
         this.logContext = logContext;
         this.time = time;
@@ -186,6 +193,7 @@ public class Fetcher<K, V> implements Closeable {
         this.retryBackoffMs = retryBackoffMs;
         this.requestTimeoutMs = requestTimeoutMs;
         this.isolationLevel = isolationLevel;
+        this.apiVersions = apiVersions;
         this.sessionHandlers = new HashMap<>();
         this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(client, logContext);
         this.nodesWithPendingFetchRequests = new HashSet<>();
@@ -711,6 +719,19 @@ public class Fetcher<K, V> implements Closeable {
         }
     }
 
+    private boolean hasUsableOffsetForLeaderEpochVersion(Node node) {
+        NodeApiVersions nodeApiVersions = apiVersions.get(node.idString());
+
+        if (nodeApiVersions == null)
+            return false;
+
+        ApiVersionsResponse.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
+        if (apiVersion == null)
+            return false;
+
+        return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion);
+    }
+
     /**
      * For each partition which needs validation, make an asynchronous request to get the end-offsets for the partition
      * with the epoch less than or equal to the epoch the partition last saw.
@@ -727,12 +748,22 @@ public class Fetcher<K, V> implements Closeable {
                 return;
             }
 
-            subscriptions.setNextAllowedRetry(dataMap.keySet(), time.milliseconds() + requestTimeoutMs);
-
             final Map<TopicPartition, Metadata.LeaderAndEpoch> cachedLeaderAndEpochs = partitionsToValidate.entrySet()
                     .stream()
                     .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().currentLeader));
 
+            if (!hasUsableOffsetForLeaderEpochVersion(node)) {
+                log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not " +
+                                "support the required protocol version (introduced in Kafka 2.3)",
+                        cachedLeaderAndEpochs.keySet());
+                for (TopicPartition partition : cachedLeaderAndEpochs.keySet()) {
+                    subscriptions.completeValidation(partition);
+                }
+                return;
+            }
+
+            subscriptions.setNextAllowedRetry(dataMap.keySet(), time.milliseconds() + requestTimeoutMs);
+
             RequestFuture<OffsetsForLeaderEpochClient.OffsetForEpochResult> future = offsetsForLeaderEpochClient.sendAsyncRequest(node, partitionsToValidate);
             future.addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() {
                 @Override
@@ -772,7 +803,7 @@ public class Fetcher<K, V> implements Closeable {
                                 }
                             } else {
                                 // Offset is fine, clear the validation state
-                                subscriptions.validate(respTopicPartition);
+                                subscriptions.completeValidation(respTopicPartition);
                             }
                         }
                     });
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index d7b02a7..9556d3c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -19,7 +19,6 @@ package org.apache.kafka.clients.consumer.internals;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
-import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.EpochEndOffset;
@@ -53,7 +52,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
             fetchEpoch -> partitionData.put(topicPartition,
                 new OffsetsForLeaderEpochRequest.PartitionData(fetchPosition.currentLeader.epoch, fetchEpoch))));
 
-        return OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(), partitionData);
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(partitionData);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 87d1a35..69bc6e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -382,7 +382,7 @@ public class SubscriptionState {
         return assignedState(tp).awaitingValidation();
     }
 
-    public void validate(TopicPartition tp) {
+    public void completeValidation(TopicPartition tp) {
         assignedState(tp).validate();
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
index a37b6da..b3846d1 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java
@@ -84,6 +84,10 @@ public class ApiVersionsResponse extends AbstractResponse {
             this(apiKey.id, apiKey.oldestVersion(), apiKey.latestVersion());
         }
 
+        public ApiVersion(ApiKeys apiKey, short minVersion, short maxVersion) {
+            this(apiKey.id, minVersion, maxVersion);
+        }
+
         public ApiVersion(short apiKey, short minVersion, short maxVersion) {
             this.apiKey = apiKey;
             this.minVersion = minVersion;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index d3df6cf..d5c78da 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Field;
@@ -44,6 +45,11 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
      */
     public static final int CONSUMER_REPLICA_ID = -1;
 
+    /**
+     * Sentinel replica_id which indicates either a debug consumer or a replica which is using
+     * an old version of the protocol.
+     */
+    public static final int DEBUGGING_REPLICA_ID = -2;
 
     private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
             "An array of topics to get epochs for");
@@ -101,23 +107,29 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
         private final Map<TopicPartition, PartitionData> epochsByPartition;
         private final int replicaId;
 
-        Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
-            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
+        Builder(short oldestAllowedVersion, short latestAllowedVersion, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
+            super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, oldestAllowedVersion, latestAllowedVersion);
             this.epochsByPartition = epochsByPartition;
             this.replicaId = replicaId;
         }
 
-        public static Builder forConsumer(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
-            return new Builder(version, epochsByPartition, CONSUMER_REPLICA_ID);
+        public static Builder forConsumer(Map<TopicPartition, PartitionData> epochsByPartition) {
+            // Old versions of this API require CLUSTER permission which is not typically granted
+            // to clients. Beginning with version 3, the broker requires only TOPIC Describe
+            // permission for the topic of each requested partition. In order to ensure client
+            // compatibility, we only send this request when we can guarantee the relaxed permissions.
+            return new Builder((short) 3, ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(),
+                    epochsByPartition, CONSUMER_REPLICA_ID);
         }
 
         public static Builder forFollower(short version, Map<TopicPartition, PartitionData> epochsByPartition, int replicaId) {
-            return new Builder(version, epochsByPartition, replicaId);
-
+            return new Builder(version, version, epochsByPartition, replicaId);
         }
 
         @Override
         public OffsetsForLeaderEpochRequest build(short version) {
+            if (version < oldestAllowedVersion() || version > latestAllowedVersion())
+                throw new UnsupportedVersionException("Cannot build " + this + " with version " + version);
             return new OffsetsForLeaderEpochRequest(epochsByPartition, replicaId, version);
         }
 
@@ -143,7 +155,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
 
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
-        replicaId = struct.getOrElse(REPLICA_ID, CONSUMER_REPLICA_ID);
+        replicaId = struct.getOrElse(REPLICA_ID, DEBUGGING_REPLICA_ID);
         epochsByPartition = new HashMap<>();
         for (Object topicAndEpochsObj : struct.get(TOPICS)) {
             Struct topicAndEpochs = (Struct) topicAndEpochsObj;
@@ -222,4 +234,13 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
             return bld.toString();
         }
     }
+
+    /**
+     * Check whether a broker allows Topic-level permissions in order to use the
+     * OffsetForLeaderEpoch API. Old versions require Cluster permission.
+     */
+    public static boolean supportsTopicPermission(short latestUsableVersion) {
+        return latestUsableVersion >= 3;
+    }
+
 }
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index 84585ca..53bef12 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -18,12 +18,13 @@
   "type": "request",
   "name": "OffsetForLeaderEpochRequest",
   // Version 1 is the same as version 0.
-  //
   // Version 2 adds the current leader epoch to support fencing.
-  // Version 3 adds ReplicaId
+  // Version 3 adds ReplicaId (the default is -2 which conventionally represents a
+  //    "debug" consumer which is allowed to see offsets beyond the high watermark).
+  //    Followers will use this replicaId when using an older version of the protocol.
   "validVersions": "0-3",
   "fields": [
-    { "name": "ReplicaId", "type": "int32", "versions": "3+",
+    { "name": "ReplicaId", "type": "int32", "versions": "3+", "default": -2, "ignorable": true,
       "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
     { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
       "about": "Each topic to get offsets for.", "fields": [
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
index 654a606..a66a86f 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -36,7 +36,7 @@ public class ApiVersionsTest {
         assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
 
         apiVersions.update("1", NodeApiVersions.create(Collections.singleton(
-                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE, (short) 0, (short) 2))));
         assertEquals(RecordBatch.MAGIC_VALUE_V1, apiVersions.maxUsableProduceMagic());
 
         apiVersions.remove("1");
diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index db5c7df..b5bba71 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -272,12 +272,11 @@ public class NetworkClientTest {
 
     // Creates expected ApiVersionsResponse from the specified node, where the max protocol version for the specified
     // key is set to the specified version.
-    private ApiVersionsResponse createExpectedApiVersionsResponse(Node node, ApiKeys key,
-        short apiVersionsMaxProtocolVersion) {
+    private ApiVersionsResponse createExpectedApiVersionsResponse(ApiKeys key, short maxVersion) {
         List<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
             if (apiKey == key) {
-                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey.id, (short) 0, apiVersionsMaxProtocolVersion));
+                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey, (short) 0, maxVersion));
             } else {
                 versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
             }
@@ -289,7 +288,7 @@ public class NetworkClientTest {
     public void testThrottlingNotEnabledForConnectionToOlderBroker() {
         // Instrument the test so that the max protocol version for PRODUCE returned from the node is 5 and thus
         // client-side throttling is not enabled. Also, return a response with a 100ms throttle delay.
-        setExpectedApiVersionsResponse(createExpectedApiVersionsResponse(node, ApiKeys.PRODUCE, (short) 5));
+        setExpectedApiVersionsResponse(createExpectedApiVersionsResponse(ApiKeys.PRODUCE, (short) 5));
         while (!client.ready(node, time.milliseconds()))
             client.poll(1, time.milliseconds());
         selector.clear();
diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
index 88d0c2e..58fb363 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java
@@ -58,7 +58,7 @@ public class NodeApiVersionsTest {
         List<ApiVersion> versionList = new ArrayList<>();
         for (ApiKeys apiKey : ApiKeys.values()) {
             if (apiKey == ApiKeys.DELETE_TOPICS) {
-                versionList.add(new ApiVersion(apiKey.id, (short) 10000, (short) 10001));
+                versionList.add(new ApiVersion(apiKey, (short) 10000, (short) 10001));
             } else {
                 versionList.add(new ApiVersion(apiKey));
             }
@@ -93,7 +93,7 @@ public class NodeApiVersionsTest {
     @Test
     public void testLatestUsableVersion() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 3)));
+                new ApiVersion(ApiKeys.PRODUCE, (short) 1, (short) 3)));
         assertEquals(3, apiVersions.latestUsableVersion(ApiKeys.PRODUCE));
         assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1));
         assertEquals(1, apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 1, (short) 1));
@@ -108,14 +108,14 @@ public class NodeApiVersionsTest {
     @Test(expected = UnsupportedVersionException.class)
     public void testLatestUsableVersionOutOfRangeLow() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 1, (short) 2)));
+                new ApiVersion(ApiKeys.PRODUCE, (short) 1, (short) 2)));
         apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 3, (short) 4);
     }
 
     @Test(expected = UnsupportedVersionException.class)
     public void testLatestUsableVersionOutOfRangeHigh() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 2, (short) 3)));
+                new ApiVersion(ApiKeys.PRODUCE, (short) 2, (short) 3)));
         apiVersions.latestUsableVersion(ApiKeys.PRODUCE, (short) 0, (short) 1);
     }
 
@@ -129,7 +129,7 @@ public class NodeApiVersionsTest {
     @Test(expected = UnsupportedVersionException.class)
     public void testLatestUsableVersionOutOfRange() {
         NodeApiVersions apiVersions = NodeApiVersions.create(Collections.singleton(
-                new ApiVersion(ApiKeys.PRODUCE.id, (short) 300, (short) 300)));
+                new ApiVersion(ApiKeys.PRODUCE, (short) 300, (short) 300)));
         apiVersions.latestUsableVersion(ApiKeys.PRODUCE);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 42cccd4..8e6bd01 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.MockClient;
@@ -1924,7 +1925,8 @@ public class KafkaConsumerTest {
                 time,
                 retryBackoffMs,
                 requestTimeoutMs,
-                IsolationLevel.READ_UNCOMMITTED);
+                IsolationLevel.READ_UNCOMMITTED,
+                new ApiVersions());
 
         return new KafkaConsumer<>(
                 loggerFactory,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0e2662a..69036ca 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -157,6 +157,7 @@ public class FetcherTest {
     private FetcherMetricsRegistry metricsRegistry;
     private MockClient client;
     private Metrics metrics;
+    private ApiVersions apiVersions = new ApiVersions();
     private ConsumerNetworkClient consumerClient;
     private Fetcher<?, ?> fetcher;
 
@@ -778,7 +779,7 @@ public class FetcherTest {
             buildFetcher();
 
             client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(
-                new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH.id, (short) 2, (short) 2))));
+                new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH, (short) 2, (short) 2))));
             makeFetchRequestWithIncompleteRecord();
             try {
                 fetcher.fetchedRecords();
@@ -2836,7 +2837,8 @@ public class FetcherTest {
                 time,
                 retryBackoffMs,
                 requestTimeoutMs,
-                IsolationLevel.READ_UNCOMMITTED) {
+                IsolationLevel.READ_UNCOMMITTED,
+                apiVersions) {
             @Override
             protected FetchSessionHandler sessionHandler(int id) {
                 final FetchSessionHandler handler = super.sessionHandler(id);
@@ -3207,6 +3209,43 @@ public class FetcherTest {
     }
 
     @Test
+    public void testOffsetValidationSkippedForOldBroker() {
+        // Old brokers may require CLUSTER permission to use the OffsetForLeaderEpoch API,
+        // so we should skip offset validation and not send the request.
+
+        buildFetcher();
+        assignFromUser(singleton(tp0));
+
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put(tp0.topic(), 4);
+
+        final int epochOne = 1;
+        final int epochTwo = 2;
+
+        // Start with metadata, epoch=1
+        metadata.update(TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L);
+
+        // Offset validation requires OffsetForLeaderEpoch request v3 or higher
+        Node node = metadata.fetch().nodes().get(0);
+        apiVersions.update(node.idString(), NodeApiVersions.create(singleton(
+                new ApiVersionsResponse.ApiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH, (short) 0, (short) 2))));
+
+        // Seek with a position and leader+epoch
+        Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(
+                metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
+        subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
+
+        // Update metadata to epoch=2, enter validation
+        metadata.update(TestUtils.metadataUpdateWith("dummy", 1,
+                Collections.emptyMap(), partitionCounts, tp -> epochTwo), 0L);
+        fetcher.validateOffsetsIfNeeded();
+
+        // Offset validation is skipped
+        assertFalse(subscriptions.awaitingValidation(tp0));
+    }
+
+    @Test
     public void testOffsetValidationFencing() {
         buildFetcher();
         assignFromUser(singleton(tp0));
@@ -3221,6 +3260,10 @@ public class FetcherTest {
         // Start with metadata, epoch=1
         metadata.update(TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> epochOne), 0L);
 
+        // Offset validation requires OffsetForLeaderEpoch request v3 or higher
+        Node node = metadata.fetch().nodes().get(0);
+        apiVersions.update(node.idString(), NodeApiVersions.create());
+
         // Seek with a position and leader+epoch
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(epochOne));
         subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(epochOne), leaderAndEpoch));
@@ -3231,7 +3274,7 @@ public class FetcherTest {
         assertTrue(subscriptions.awaitingValidation(tp0));
 
         // Update the position to epoch=3, as we would from a fetch
-        subscriptions.validate(tp0);
+        subscriptions.completeValidation(tp0);
         SubscriptionState.FetchPosition nextPosition = new SubscriptionState.FetchPosition(
                 10,
                 Optional.of(epochTwo),
@@ -3287,6 +3330,10 @@ public class FetcherTest {
         MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 1, Collections.emptyMap(), partitionCounts, tp -> 2);
         metadata.update(metadataResponse, 0L);
 
+        // Offset validation requires OffsetForLeaderEpoch request v3 or higher
+        Node node = metadata.fetch().nodes().get(0);
+        apiVersions.update(node.idString(), NodeApiVersions.create());
+
         // Seek
         Metadata.LeaderAndEpoch leaderAndEpoch = new Metadata.LeaderAndEpoch(metadata.leaderAndEpoch(tp0).leader, Optional.of(1));
         subscriptions.seek(tp0, new SubscriptionState.FetchPosition(0, Optional.of(1), leaderAndEpoch));
@@ -3547,7 +3594,8 @@ public class FetcherTest {
                 time,
                 retryBackoffMs,
                 requestTimeoutMs,
-                isolationLevel);
+                isolationLevel,
+                apiVersions);
     }
 
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 5061447..a58f3b5 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -188,7 +188,7 @@ public class RecordAccumulatorTest {
 
         ApiVersions apiVersions = new ApiVersions();
         apiVersions.update(node1.idString(), NodeApiVersions.create(Collections.singleton(
-                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE, (short) 0, (short) 2))));
 
         RecordAccumulator accum = createTestRecordAccumulator(
                 batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD, 10 * 1024, compressionType, 0);
@@ -705,7 +705,7 @@ public class RecordAccumulatorTest {
         long totalSize = 10 * batchSize;
         String metricGrpName = "producer-metrics";
 
-        apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id,
+        apiVersions.update("foobar", NodeApiVersions.create(Arrays.asList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE,
                 (short) 0, (short) 2))));
         RecordAccumulator accum = new RecordAccumulator(logContext, batchSize + DefaultRecordBatch.RECORD_BATCH_OVERHEAD,
             CompressionType.NONE, lingerMs, retryBackoffMs, deliveryTimeoutMs, metrics, metricGrpName, time, apiVersions, new TransactionManager(),
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 04197d8..5ae76c3 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -181,7 +181,7 @@ public class SenderTest {
 
         // now the partition leader supports only v2
         apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
-                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE, (short) 0, (short) 2))));
 
         client.prepareResponse(new MockClient.RequestMatcher() {
             @Override
@@ -221,7 +221,7 @@ public class SenderTest {
 
         // now the partition leader supports only v2
         apiVersions.update("0", NodeApiVersions.create(Collections.singleton(
-                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE.id, (short) 0, (short) 2))));
+                new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE, (short) 0, (short) 2))));
 
         Future<RecordMetadata> future2 = accumulator.append(tp1, 0L, "key".getBytes(), "value".getBytes(),
                 null, null, MAX_BLOCK_TIMEOUT).future;
diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
index 547dbc4..7c9d335 100644
--- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java
@@ -37,10 +37,10 @@ import org.junit.rules.Timeout;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 
+import static java.util.Collections.singletonList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -67,10 +67,10 @@ public final class MessageTest {
                 setTransactionalId("blah").
                 setProducerId(0xbadcafebadcafeL).
                 setProducerEpoch((short) 30000).
-                setTopics(new AddPartitionsToTxnTopicCollection(Collections.singletonList(
+                setTopics(new AddPartitionsToTxnTopicCollection(singletonList(
                         new AddPartitionsToTxnTopic().
                                 setName("Topic").
-                                setPartitions(Collections.singletonList(1))).iterator())));
+                                setPartitions(singletonList(1))).iterator())));
     }
 
     @Test
@@ -97,16 +97,16 @@ public final class MessageTest {
                 Arrays.asList(new MetadataRequestData.MetadataRequestTopic().setName("foo"),
                         new MetadataRequestData.MetadataRequestTopic().setName("bar")
                 )));
-        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+        testAllMessageRoundTripsFromVersion((short) 1, new MetadataRequestData().
                 setTopics(null).
                 setAllowAutoTopicCreation(true).
                 setIncludeClusterAuthorizedOperations(false).
-                setIncludeTopicAuthorizedOperations(false), (short) 1);
-        testAllMessageRoundTripsFromVersion(new MetadataRequestData().
+                setIncludeTopicAuthorizedOperations(false));
+        testAllMessageRoundTripsFromVersion((short) 4, new MetadataRequestData().
                 setTopics(null).
                 setAllowAutoTopicCreation(false).
                 setIncludeClusterAuthorizedOperations(false).
-                setIncludeTopicAuthorizedOperations(false), (short) 4);
+                setIncludeTopicAuthorizedOperations(false));
     }
 
     @Test
@@ -117,7 +117,7 @@ public final class MessageTest {
                 .setGenerationId(15);
         testAllMessageRoundTrips(newRequest.get());
         testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
-        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 3);
+        testAllMessageRoundTripsFromVersion((short) 3, newRequest.get().setGroupInstanceId("instanceId"));
     }
 
     @Test
@@ -129,9 +129,9 @@ public final class MessageTest {
                 .setProtocols(new JoinGroupRequestData.JoinGroupRequestProtocolCollection())
                 .setSessionTimeoutMs(10000);
         testAllMessageRoundTrips(newRequest.get());
-        testAllMessageRoundTripsFromVersion(newRequest.get().setRebalanceTimeoutMs(20000), (short) 1);
+        testAllMessageRoundTripsFromVersion((short) 1, newRequest.get().setRebalanceTimeoutMs(20000));
         testAllMessageRoundTrips(newRequest.get().setGroupInstanceId(null));
-        testAllMessageRoundTripsFromVersion(newRequest.get().setGroupInstanceId("instanceId"), (short) 5);
+        testAllMessageRoundTripsFromVersion((short) 5, newRequest.get().setGroupInstanceId("instanceId"));
     }
 
     @Test
@@ -143,7 +143,7 @@ public final class MessageTest {
                 .setAssignments(new ArrayList<>());
         testAllMessageRoundTrips(request.get());
         testAllMessageRoundTrips(request.get().setGroupInstanceId(null));
-        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 3);
+        testAllMessageRoundTripsFromVersion((short) 3, request.get().setGroupInstanceId("instanceId"));
     }
 
     @Test
@@ -157,27 +157,70 @@ public final class MessageTest {
                 .setMemberId("memberId")
                 .setTopics(new ArrayList<>())
                 .setGenerationId(15);
-        testAllMessageRoundTripsFromVersion(request.get(), (short) 1);
-        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId(null), (short) 1);
-        testAllMessageRoundTripsFromVersion(request.get().setGroupInstanceId("instanceId"), (short) 7);
+        testAllMessageRoundTripsFromVersion((short) 1, request.get());
+        testAllMessageRoundTripsFromVersion((short) 1, request.get().setGroupInstanceId(null));
+        testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId("instanceId"));
+    }
+
+    @Test
+    public void testOffsetForLeaderEpochVersions() throws Exception {
+        // Version 2 adds optional current leader epoch
+        OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataNoCurrentEpoch =
+                new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition()
+                        .setPartitionIndex(0)
+                        .setLeaderEpoch(3);
+        OffsetForLeaderEpochRequestData.OffsetForLeaderPartition partitionDataWithCurrentEpoch =
+                new OffsetForLeaderEpochRequestData.OffsetForLeaderPartition()
+                        .setPartitionIndex(0)
+                        .setLeaderEpoch(3)
+                        .setCurrentLeaderEpoch(5);
+
+        testAllMessageRoundTrips(new OffsetForLeaderEpochRequestData().setTopics(singletonList(
+                new OffsetForLeaderEpochRequestData.OffsetForLeaderTopic()
+                        .setName("foo")
+                        .setPartitions(singletonList(partitionDataNoCurrentEpoch)))
+        ));
+        testAllMessageRoundTripsBeforeVersion((short) 2, partitionDataWithCurrentEpoch, partitionDataNoCurrentEpoch);
+        testAllMessageRoundTripsFromVersion((short) 2, partitionDataWithCurrentEpoch);
+
+        // Version 3 adds the optional replica Id field
+        testAllMessageRoundTripsFromVersion((short) 3, new OffsetForLeaderEpochRequestData().setReplicaId(5));
+        testAllMessageRoundTripsBeforeVersion((short) 3,
+                new OffsetForLeaderEpochRequestData().setReplicaId(5),
+                new OffsetForLeaderEpochRequestData());
+        testAllMessageRoundTripsBeforeVersion((short) 3,
+                new OffsetForLeaderEpochRequestData().setReplicaId(5),
+                new OffsetForLeaderEpochRequestData().setReplicaId(-2));
+
     }
 
     private void testAllMessageRoundTrips(Message message) throws Exception {
-        testAllMessageRoundTripsFromVersion(message, message.lowestSupportedVersion());
+        testAllMessageRoundTripsFromVersion(message.lowestSupportedVersion(), message);
     }
 
-    private void testAllMessageRoundTripsFromVersion(Message message, short fromVersion) throws Exception {
+    private void testAllMessageRoundTripsBeforeVersion(short beforeVersion, Message message, Message expected) throws Exception {
+        for (short version = 0; version < beforeVersion; version++) {
+            testMessageRoundTrip(version, message, expected);
+        }
+    }
+
+    private void testAllMessageRoundTripsFromVersion(short fromVersion, Message message) throws Exception {
         for (short version = fromVersion; version < message.highestSupportedVersion(); version++) {
-            testMessageRoundTrips(message, version);
+            testEquivalentMessageRoundTrip(version, message);
         }
     }
 
-    private void testMessageRoundTrips(Message message, short version) throws Exception {
-        testStructRoundTrip(message, version);
-        testByteBufferRoundTrip(message, version);
+    private void testMessageRoundTrip(short version, Message message, Message expected) throws Exception {
+        testByteBufferRoundTrip(version, message, expected);
+        testStructRoundTrip(version, message, expected);
+    }
+
+    private void testEquivalentMessageRoundTrip(short version, Message message) throws Exception {
+        testStructRoundTrip(version, message, message);
+        testByteBufferRoundTrip(version, message, message);
     }
 
-    private void testByteBufferRoundTrip(Message message, short version) throws Exception {
+    private void testByteBufferRoundTrip(short version, Message message, Message expected) throws Exception {
         int size = message.size(version);
         ByteBuffer buf = ByteBuffer.allocate(size);
         ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(buf);
@@ -187,18 +230,18 @@ public final class MessageTest {
         buf.flip();
         message2.read(byteBufferAccessor, version);
         assertEquals(size, buf.position());
-        assertEquals(message, message2);
-        assertEquals(message.hashCode(), message2.hashCode());
-        assertEquals(message.toString(), message2.toString());
+        assertEquals(expected, message2);
+        assertEquals(expected.hashCode(), message2.hashCode());
+        assertEquals(expected.toString(), message2.toString());
     }
 
-    private void testStructRoundTrip(Message message, short version) throws Exception {
+    private void testStructRoundTrip(short version, Message message, Message expected) throws Exception {
         Struct struct = message.toStruct(version);
         Message message2 = message.getClass().newInstance();
         message2.fromStruct(struct, version);
-        assertEquals(message, message2);
-        assertEquals(message.hashCode(), message2.hashCode());
-        assertEquals(message.toString(), message2.toString());
+        assertEquals(expected, message2);
+        assertEquals(expected.hashCode(), message2.hashCode());
+        assertEquals(expected.toString(), message2.toString());
     }
 
     /**
@@ -348,7 +391,7 @@ public final class MessageTest {
      */
     private static List<NamedType> flatten(NamedType type) {
         if (!(type.type instanceof Schema)) {
-            return Collections.singletonList(type);
+            return singletonList(type);
         }
         Schema schema = (Schema) type.type;
         ArrayList<NamedType> results = new ArrayList<>();
@@ -367,7 +410,7 @@ public final class MessageTest {
         verifySizeSucceeds((short) 0,
             new OffsetCommitRequestData().setRetentionTimeMs(123));
         verifySizeRaisesUve((short) 5, "forgotten",
-            new FetchRequestData().setForgotten(Collections.singletonList(
+            new FetchRequestData().setForgotten(singletonList(
                 new FetchRequestData.ForgottenTopic().setName("foo"))));
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java
new file mode 100644
index 0000000..604f88f
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequestTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+public class OffsetsForLeaderEpochRequestTest {
+
+    @Test
+    public void testForConsumerRequiresVersion3() {
+        OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forConsumer(Collections.emptyMap());
+        for (short version = 0; version < 3; version++) {
+            final short v = version;
+            assertThrows(UnsupportedVersionException.class, () -> builder.build(v));
+        }
+
+        for (short version = 3; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) {
+            OffsetsForLeaderEpochRequest request = builder.build((short) 3);
+            assertEquals(OffsetsForLeaderEpochRequest.CONSUMER_REPLICA_ID, request.replicaId());
+        }
+    }
+
+    @Test
+    public void testDefaultReplicaId() {
+        for (short version = 0; version < ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion(); version++) {
+            int replicaId = 1;
+            OffsetsForLeaderEpochRequest.Builder builder = OffsetsForLeaderEpochRequest.Builder.forFollower(
+                    version, Collections.emptyMap(), replicaId);
+            OffsetsForLeaderEpochRequest request = builder.build();
+            OffsetsForLeaderEpochRequest parsed = (OffsetsForLeaderEpochRequest) AbstractRequest.parseRequest(
+                    ApiKeys.OFFSET_FOR_LEADER_EPOCH, version, request.toStruct());
+            if (version < 3)
+                assertEquals(OffsetsForLeaderEpochRequest.DEBUGGING_REPLICA_ID, parsed.replicaId());
+            else
+                assertEquals(replicaId, parsed.replicaId());
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index e8f349f..32f3305 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -290,10 +290,10 @@ public class RequestResponseTest {
         checkRequest(createListOffsetRequest(0), true);
         checkErrorResponse(createListOffsetRequest(0), new UnknownServerException(), true);
         checkResponse(createListOffsetResponse(0), 0, true);
-        checkRequest(createLeaderEpochRequest(0), true);
-        checkRequest(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), true);
+        checkRequest(createLeaderEpochRequestForReplica(0, 1), true);
+        checkRequest(createLeaderEpochRequestForConsumer(), true);
         checkResponse(createLeaderEpochResponse(), 0, true);
-        checkErrorResponse(createLeaderEpochRequest(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion()), new UnknownServerException(), true);
+        checkErrorResponse(createLeaderEpochRequestForConsumer(), new UnknownServerException(), true);
         checkRequest(createAddPartitionsToTxnRequest(), true);
         checkErrorResponse(createAddPartitionsToTxnRequest(), new UnknownServerException(), true);
         checkResponse(createAddPartitionsToTxnResponse(), 0, true);
@@ -1251,17 +1251,25 @@ public class RequestResponseTest {
         return new InitProducerIdResponse(responseData);
     }
 
-    private OffsetsForLeaderEpochRequest createLeaderEpochRequest(int version) {
+    private Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> createOffsetForLeaderEpochPartitionData() {
         Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = new HashMap<>();
-
         epochs.put(new TopicPartition("topic1", 0),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
         epochs.put(new TopicPartition("topic1", 1),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(0), 1));
         epochs.put(new TopicPartition("topic2", 2),
                 new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), 3));
+        return epochs;
+    }
+
+    private OffsetsForLeaderEpochRequest createLeaderEpochRequestForConsumer() {
+        Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = createOffsetForLeaderEpochPartitionData();
+        return OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs).build();
+    }
 
-        return OffsetsForLeaderEpochRequest.Builder.forConsumer((short) version, epochs).build();
+    private OffsetsForLeaderEpochRequest createLeaderEpochRequestForReplica(int version, int replicaId) {
+        Map<TopicPartition, OffsetsForLeaderEpochRequest.PartitionData> epochs = createOffsetForLeaderEpochPartitionData();
+        return OffsetsForLeaderEpochRequest.Builder.forFollower((short) version, epochs, replicaId).build();
     }
 
     private OffsetsForLeaderEpochResponse createLeaderEpochResponse() {
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index a577b2e..011a19b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -295,7 +295,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
 
   private def offsetsForLeaderEpochRequest: OffsetsForLeaderEpochRequest = {
     val epochs = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(27), 7))
-    OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs.asJava).build()
+    OffsetsForLeaderEpochRequest.Builder.forConsumer(epochs.asJava).build()
   }
 
   private def createOffsetFetchRequest = {
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 4d1416c..b6f8a14 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -35,7 +35,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     val partition = new TopicPartition(topic, 0)
 
     val epochs = Map(partition -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty[Integer], 0)).asJava
-    val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs).build()
+    val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
+      ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
 
     // Unknown topic
     val randomBrokerId = servers.head.config.brokerId
@@ -61,8 +62,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     def assertResponseErrorForEpoch(error: Errors, brokerId: Int, currentLeaderEpoch: Optional[Integer]): Unit = {
       val epochs = Map(topicPartition -> new OffsetsForLeaderEpochRequest.PartitionData(
         currentLeaderEpoch, 0)).asJava
-      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs)
-        .build()
+      val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
+        ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, epochs, 1).build()
       assertResponseError(error, brokerId, request)
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index 047188f..a7fbc5d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -362,8 +362,8 @@ class RequestQuotaTest extends BaseRequestTest {
           new InitProducerIdRequest.Builder(requestData)
 
         case ApiKeys.OFFSET_FOR_LEADER_EPOCH =>
-          OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
-            Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
+          OffsetsForLeaderEpochRequest.Builder.forConsumer(Map(tp ->
+            new OffsetsForLeaderEpochRequest.PartitionData(Optional.of(15), 0)).asJava)
 
         case ApiKeys.ADD_PARTITIONS_TO_TXN =>
           new AddPartitionsToTxnRequest.Builder("test-transactional-id", 1, 0, List(tp).asJava)
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 8a6dcba..cf78bac 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -279,8 +279,9 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
     def leaderOffsetsFor(partitions: Map[TopicPartition, Int]): Map[TopicPartition, EpochEndOffset] = {
       val partitionData = partitions.mapValues(
         new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), _))
-      val request = OffsetsForLeaderEpochRequest.Builder.forConsumer(ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion,
-        partitionData.asJava)
+
+      val request = OffsetsForLeaderEpochRequest.Builder.forFollower(
+        ApiKeys.OFFSET_FOR_LEADER_EPOCH.latestVersion, partitionData.asJava, 1)
       val response = sender.sendRequest(request)
       response.responseBody.asInstanceOf[OffsetsForLeaderEpochResponse].responses.asScala
     }