You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2019/05/18 05:46:03 UTC

[kafka] branch trunk updated: KAFKA-8365; Consumer and protocol support for follower fetching (#6731)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2847e8  KAFKA-8365; Consumer and protocol support for follower fetching (#6731)
e2847e8 is described below

commit e2847e8603fe19a87ff03584fb38954e4bd3a59e
Author: David Arthur <mu...@gmail.com>
AuthorDate: Sat May 18 01:45:46 2019 -0400

    KAFKA-8365; Consumer and protocol support for follower fetching (#6731)
    
    This patch includes API changes for follower fetching per [KIP-392](https://cwiki.apache.org/confluence/display/KAFKA/KIP-392%3A+Allow+consumers+to+fetch+from+closest+replica) as well as the consumer implementation. After this patch, consumers will continue to fetch only from the leader, since the broker implementation to select an alternate read replica is not included here.
    
    Adds new `client.rack` consumer configuration property is added which allows the consumer to indicate its rack. This is just an arbitrary string to indicate some relative location, it doesn't have to actually represent a physical rack. We are keeping the naming consistent with the broker property (`broker.rack`).
    
    FetchRequest now includes `rack_id` which can optionally be specified by the consumer. FetchResponse includes an optional `preferred_read_replica` field for each partition in the response. OffsetForLeaderEpochRequest also adds new `replica_id` field which is similar to the same field in FetchRequest.
    
    When the consumer sees a `preferred_read_replica` in a fetch response, it will use the Node with that ID for the next fetch.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/clients/CommonClientConfigs.java  |   3 +
 .../java/org/apache/kafka/clients/Metadata.java    |   4 +
 .../kafka/clients/consumer/ConsumerConfig.java     |  10 ++
 .../kafka/clients/consumer/KafkaConsumer.java      |   1 +
 .../kafka/clients/consumer/internals/Fetcher.java  |  81 +++++++++++---
 .../consumer/internals/SubscriptionState.java      |  66 +++++++++++-
 .../main/java/org/apache/kafka/common/Cluster.java |  17 +++
 .../apache/kafka/common/requests/FetchRequest.java |  36 ++++++-
 .../kafka/common/requests/FetchResponse.java       |  57 +++++++++-
 .../requests/OffsetsForLeaderEpochRequest.java     |  32 +++++-
 .../requests/OffsetsForLeaderEpochResponse.java    |   5 +-
 .../resources/common/message/FetchRequest.json     |   6 +-
 .../resources/common/message/FetchResponse.json    |   4 +-
 .../message/OffsetForLeaderEpochRequest.json       |   5 +-
 .../message/OffsetForLeaderEpochResponse.json      |   2 +-
 .../org/apache/kafka/clients/MetadataTest.java     |  23 ++++
 .../kafka/clients/consumer/KafkaConsumerTest.java  |   1 +
 .../clients/consumer/internals/FetcherTest.java    | 116 +++++++++++++++++++--
 .../consumer/internals/SubscriptionStateTest.java  |  31 ++++++
 .../kafka/common/requests/RequestResponseTest.java |  16 +--
 core/src/main/scala/kafka/api/ApiVersion.scala     |  11 +-
 .../scala/kafka/server/ReplicaFetcherThread.scala  |   6 +-
 .../kafka/server/ReplicaFetcherThreadTest.scala    |   2 +-
 23 files changed, 484 insertions(+), 51 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 491b5de..49465dc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -60,6 +60,9 @@ public class CommonClientConfigs {
     public static final String CLIENT_ID_CONFIG = "client.id";
     public static final String CLIENT_ID_DOC = "An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.";
 
+    public static final String CLIENT_RACK_CONFIG = "client.rack";
+    public static final String CLIENT_RACK_DOC = "A rack identifier for this client. This can be any string value which indicates where this client is physically located. It corresponds with the broker config 'broker.rack'";
+
     public static final String RECONNECT_BACKOFF_MS_CONFIG = "reconnect.backoff.ms";
     public static final String RECONNECT_BACKOFF_MS_DOC = "The base amount of time to wait before attempting to reconnect to a given host. This avoids repeatedly connecting to a host in a tight loop. This backoff applies to all connection attempts by the client to a broker.";
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index ae75045..f991fa6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -129,6 +129,10 @@ public class Metadata implements Closeable {
         return Math.max(timeToExpire, timeToAllowUpdate(nowMs));
     }
 
+    public long metadataExpireMs() {
+        return this.metadataExpireMs;
+    }
+
     /**
      * Request an update of the current cluster metadata info, return the current updateVersion before the update
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index ba1928e..c80b71f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -177,6 +177,11 @@ public class ConsumerConfig extends AbstractConfig {
     public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG;
 
     /**
+     * <code>client.rack</code>
+     */
+    public static final String CLIENT_RACK_CONFIG = CommonClientConfigs.CLIENT_RACK_CONFIG;
+
+    /**
      * <code>reconnect.backoff.ms</code>
      */
     public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
@@ -328,6 +333,11 @@ public class ConsumerConfig extends AbstractConfig {
                                         "",
                                         Importance.LOW,
                                         CommonClientConfigs.CLIENT_ID_DOC)
+                                .define(CLIENT_RACK_CONFIG,
+                                        Type.STRING,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.CLIENT_RACK_DOC)
                                 .define(MAX_PARTITION_FETCH_BYTES_CONFIG,
                                         Type.INT,
                                         DEFAULT_MAX_PARTITION_FETCH_BYTES,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 3bfd5ac..07be128 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -799,6 +799,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
                     config.getInt(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG),
                     config.getInt(ConsumerConfig.MAX_POLL_RECORDS_CONFIG),
                     config.getBoolean(ConsumerConfig.CHECK_CRCS_CONFIG),
+                    config.getString(ConsumerConfig.CLIENT_RACK_CONFIG),
                     this.keyDeserializer,
                     this.valueDeserializer,
                     this.metadata,
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 870a8b7..4ea9b0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -126,6 +126,7 @@ public class Fetcher<K, V> implements Closeable {
     private final long requestTimeoutMs;
     private final int maxPollRecords;
     private final boolean checkCrcs;
+    private final String clientRackId;
     private final ConsumerMetadata metadata;
     private final FetchManagerMetrics sensors;
     private final SubscriptionState subscriptions;
@@ -149,6 +150,7 @@ public class Fetcher<K, V> implements Closeable {
                    int fetchSize,
                    int maxPollRecords,
                    boolean checkCrcs,
+                   String clientRackId,
                    Deserializer<K> keyDeserializer,
                    Deserializer<V> valueDeserializer,
                    ConsumerMetadata metadata,
@@ -171,6 +173,7 @@ public class Fetcher<K, V> implements Closeable {
         this.fetchSize = fetchSize;
         this.maxPollRecords = maxPollRecords;
         this.checkCrcs = checkCrcs;
+        this.clientRackId = clientRackId;
         this.keyDeserializer = keyDeserializer;
         this.valueDeserializer = valueDeserializer;
         this.completedFetches = new ConcurrentLinkedQueue<>();
@@ -223,7 +226,9 @@ public class Fetcher<K, V> implements Closeable {
                     .isolationLevel(isolationLevel)
                     .setMaxBytes(this.maxBytes)
                     .metadata(data.metadata())
-                    .toForget(data.toForget());
+                    .toForget(data.toForget())
+                    .rackId(clientRackId);
+
             if (log.isDebugEnabled()) {
                 log.debug("Sending {} {} to broker {}", isolationLevel, data.toString(), fetchTarget);
             }
@@ -1005,6 +1010,26 @@ public class Fetcher<K, V> implements Closeable {
     }
 
     /**
+     * Determine which replica to read from.
+     */
+    Node selectReadReplica(TopicPartition partition, Node leaderReplica, long currentTimeMs) {
+        Optional<Integer> nodeId = subscriptions.preferredReadReplica(partition, currentTimeMs);
+        if (nodeId.isPresent()) {
+            Optional<Node> node = nodeId.flatMap(id -> metadata.fetch().nodeIfOnline(partition, id));
+            if (node.isPresent()) {
+                return node.get();
+            } else {
+                log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata," +
+                          " using the leader instead.", nodeId, partition);
+                subscriptions.clearPreferredReadReplica(partition);
+                return leaderReplica;
+            }
+        } else {
+            return leaderReplica;
+        }
+    }
+
+    /**
      * Create fetch requests for all nodes for which we have assigned partitions
      * that have no existing requests in flight.
      */
@@ -1015,10 +1040,12 @@ public class Fetcher<K, V> implements Closeable {
         subscriptions.assignedPartitions().forEach(
             tp -> subscriptions.maybeValidatePosition(tp, metadata.leaderAndEpoch(tp)));
 
+        long currentTimeMs = time.milliseconds();
+
         for (TopicPartition partition : fetchablePartitions()) {
+            // Use the preferred read replica if set, or the position's leader
             SubscriptionState.FetchPosition position = this.subscriptions.position(partition);
-            Metadata.LeaderAndEpoch leaderAndEpoch = position.currentLeader;
-            Node node = leaderAndEpoch.leader;
+            Node node = selectReadReplica(partition, position.currentLeader.leader, currentTimeMs);
 
             if (node == null || node.isEmpty()) {
                 metadata.requestUpdate();
@@ -1032,23 +1059,23 @@ public class Fetcher<K, V> implements Closeable {
                 log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", partition, node);
             } else {
                 // if there is a leader and no in-flight requests, issue a new fetch
-                FetchSessionHandler.Builder builder = fetchable.get(leaderAndEpoch.leader);
+                FetchSessionHandler.Builder builder = fetchable.get(node);
                 if (builder == null) {
-                    int id = leaderAndEpoch.leader.id();
+                    int id = node.id();
                     FetchSessionHandler handler = sessionHandler(id);
                     if (handler == null) {
                         handler = new FetchSessionHandler(logContext, id);
                         sessionHandlers.put(id, handler);
                     }
                     builder = handler.newBuilder();
-                    fetchable.put(leaderAndEpoch.leader, builder);
+                    fetchable.put(node, builder);
                 }
 
                 builder.add(partition, new FetchRequest.PartitionData(position.offset,
-                        FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, leaderAndEpoch.epoch));
+                        FetchRequest.INVALID_LOG_START_OFFSET, this.fetchSize, position.currentLeader.epoch));
 
                 log.debug("Added {} fetch request for partition {} at position {} to node {}", isolationLevel,
-                    partition, position, leaderAndEpoch.leader);
+                    partition, position, node);
             }
         }
 
@@ -1136,24 +1163,42 @@ public class Fetcher<K, V> implements Closeable {
                     log.trace("Updating last stable offset for partition {} to {}", tp, partition.lastStableOffset);
                     subscriptions.updateLastStableOffset(tp, partition.lastStableOffset);
                 }
+
+                if (partition.preferredReadReplica.isPresent()) {
+                    subscriptions.updatePreferredReadReplica(partitionRecords.partition, partition.preferredReadReplica.get(), () -> {
+                        long expireTimeMs = time.milliseconds() + metadata.metadataExpireMs();
+                        log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}",
+                                tp, partition.preferredReadReplica.get(), expireTimeMs);
+                        return expireTimeMs;
+                    });
+                }
+
             } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
-                       error == Errors.FENCED_LEADER_EPOCH) {
+                       error == Errors.FENCED_LEADER_EPOCH ||
+                       error == Errors.OFFSET_NOT_AVAILABLE) {
                 log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName());
                 this.metadata.requestUpdate();
             } else if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                 log.warn("Received unknown topic or partition error in fetch for partition {}", tp);
                 this.metadata.requestUpdate();
             } else if (error == Errors.OFFSET_OUT_OF_RANGE) {
-                if (fetchOffset != subscriptions.position(tp).offset) {
-                    log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
-                            "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
-                } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
-                    log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
-                    subscriptions.requestOffsetReset(tp);
+                Optional<Integer> clearedReplicaId = subscriptions.clearPreferredReadReplica(tp);
+                if (!clearedReplicaId.isPresent()) {
+                    // If there's no preferred replica to clear, we're fetching from the leader so handle this error normally
+                    if (fetchOffset != subscriptions.position(tp).offset) {
+                        log.debug("Discarding stale fetch response for partition {} since the fetched offset {} " +
+                                "does not match the current offset {}", tp, fetchOffset, subscriptions.position(tp));
+                    } else if (subscriptions.hasDefaultOffsetResetPolicy()) {
+                        log.info("Fetch offset {} is out of range for partition {}, resetting offset", fetchOffset, tp);
+                        subscriptions.requestOffsetReset(tp);
+                    } else {
+                        throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                    }
                 } else {
-                    throw new OffsetOutOfRangeException(Collections.singletonMap(tp, fetchOffset));
+                    log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}",
+                            clearedReplicaId.get(), tp, error, fetchOffset);
                 }
             } else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                 //we log the actual partition and not just the topic to help with ACL propagation issues in large clusters
@@ -1304,6 +1349,10 @@ public class Fetcher<K, V> implements Closeable {
             }
         }
 
+        private Optional<Integer> preferredReadReplica() {
+            return completedFetch.partitionData.preferredReadReplica;
+        }
+
         private void maybeEnsureValid(RecordBatch batch) {
             if (checkCrcs && currentBatch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
                 try {
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 3909421..87d1a35 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -40,6 +40,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collector;
 import java.util.stream.Collectors;
@@ -418,6 +419,39 @@ public class SubscriptionState {
         assignedState(tp).lastStableOffset = lastStableOffset;
     }
 
+    /**
+     * Set the preferred read replica with a lease timeout. After this time, the replica will no longer be valid and
+     * {@link #preferredReadReplica(TopicPartition, long)} will return an empty result.
+     *
+     * @param tp The topic partition
+     * @param preferredReadReplicaId The preferred read replica
+     * @param timeMs The time at which this preferred replica is no longer valid
+     */
+    public void updatePreferredReadReplica(TopicPartition tp, int preferredReadReplicaId, Supplier<Long> timeMs) {
+        assignedState(tp).updatePreferredReadReplica(preferredReadReplicaId, timeMs);
+    }
+
+    /**
+     * Get the preferred read replica
+     *
+     * @param tp The topic partition
+     * @param timeMs The current time
+     * @return Returns the current preferred read replica, if it has been set and if it has not expired.
+     */
+    public Optional<Integer> preferredReadReplica(TopicPartition tp, long timeMs) {
+        return assignedState(tp).preferredReadReplica(timeMs);
+    }
+
+    /**
+     * Unset the preferred read replica. This causes the fetcher to go back to the leader for fetches.
+     *
+     * @param tp The topic partition
+     * @return true if the preferred read replica was set, false otherwise.
+     */
+    public Optional<Integer> clearPreferredReadReplica(TopicPartition tp) {
+        return assignedState(tp).clearPreferredReadReplica();
+    }
+
     public Map<TopicPartition, OffsetAndMetadata> allConsumed() {
         Map<TopicPartition, OffsetAndMetadata> allConsumed = new HashMap<>();
         assignment.stream().forEach(state -> {
@@ -553,7 +587,8 @@ public class SubscriptionState {
         private boolean paused;  // whether this partition has been paused by the user
         private OffsetResetStrategy resetStrategy;  // the strategy to use if the offset needs resetting
         private Long nextRetryTimeMs;
-
+        private Integer preferredReadReplica;
+        private Long preferredReadReplicaExpireTimeMs;
 
         TopicPartitionState() {
             this.paused = false;
@@ -564,6 +599,7 @@ public class SubscriptionState {
             this.lastStableOffset = null;
             this.resetStrategy = null;
             this.nextRetryTimeMs = null;
+            this.preferredReadReplica = null;
         }
 
         private void transitionState(FetchState newState, Runnable runIfTransitioned) {
@@ -574,6 +610,33 @@ public class SubscriptionState {
             }
         }
 
+        private Optional<Integer> preferredReadReplica(long timeMs) {
+            if (preferredReadReplicaExpireTimeMs != null && timeMs > preferredReadReplicaExpireTimeMs) {
+                preferredReadReplica = null;
+                return Optional.empty();
+            } else {
+                return Optional.ofNullable(preferredReadReplica);
+            }
+        }
+
+        private void updatePreferredReadReplica(int preferredReadReplica, Supplier<Long> timeMs) {
+            if (this.preferredReadReplica == null || preferredReadReplica != this.preferredReadReplica) {
+                this.preferredReadReplica = preferredReadReplica;
+                this.preferredReadReplicaExpireTimeMs = timeMs.get();
+            }
+        }
+
+        private Optional<Integer> clearPreferredReadReplica() {
+            if (preferredReadReplica != null) {
+                int removedReplicaId = this.preferredReadReplica;
+                this.preferredReadReplica = null;
+                this.preferredReadReplicaExpireTimeMs = null;
+                return Optional.of(removedReplicaId);
+            } else {
+                return Optional.empty();
+            }
+        }
+
         private void reset(OffsetResetStrategy strategy) {
             transitionState(FetchStates.AWAIT_RESET, () -> {
                 this.resetStrategy = strategy;
@@ -594,6 +657,7 @@ public class SubscriptionState {
             if (position != null && !position.safeToFetchFrom(currentLeaderAndEpoch)) {
                 FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch);
                 validatePosition(newPosition);
+                preferredReadReplica = null;
             }
             return this.fetchState.equals(FetchStates.AWAIT_VALIDATION);
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/Cluster.java b/clients/src/main/java/org/apache/kafka/common/Cluster.java
index 753b7f9..0b01d22 100644
--- a/clients/src/main/java/org/apache/kafka/common/Cluster.java
+++ b/clients/src/main/java/org/apache/kafka/common/Cluster.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Utils;
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -27,6 +28,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Set;
 
 /**
@@ -184,6 +186,21 @@ public final class Cluster {
     }
 
     /**
+     * Get the node by node id if the replica for the given partition is online
+     * @param partition
+     * @param id
+     * @return
+     */
+    public Optional<Node> nodeIfOnline(TopicPartition partition, int id) {
+        Node node = nodeById(id);
+        if (node != null && !Arrays.asList(partition(partition).offlineReplicas()).contains(node)) {
+            return Optional.of(node);
+        } else {
+            return Optional.empty();
+        }
+    }
+
+    /**
      * Get the current leader for the given topic-partition
      * @param topicPartition The topic and partition we want to know the leader for
      * @return The node that is the leader for this topic-partition, or null if there is currently no leader
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index b3443a1..485b102 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -63,6 +63,7 @@ public class FetchRequest extends AbstractRequest {
                     "consumers to discard ABORTED transactional records");
     private static final Field.Int32 SESSION_ID = new Field.Int32("session_id", "The fetch session ID");
     private static final Field.Int32 SESSION_EPOCH = new Field.Int32("session_epoch", "The fetch session epoch");
+    private static final Field.Str RACK_ID = new Field.Str("rack_id", "The consumer's rack id");
 
     // topic level fields
     private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
@@ -194,10 +195,22 @@ public class FetchRequest extends AbstractRequest {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_REQUEST_V10 = FETCH_REQUEST_V9;
 
+    private static final Schema FETCH_REQUEST_V11 = new Schema(
+            REPLICA_ID,
+            MAX_WAIT_TIME,
+            MIN_BYTES,
+            MAX_BYTES,
+            ISOLATION_LEVEL,
+            SESSION_ID,
+            SESSION_EPOCH,
+            FETCH_REQUEST_TOPIC_V9,
+            FORGOTTEN_TOPIC_DATA_V7,
+            RACK_ID);
+
     public static Schema[] schemaVersions() {
         return new Schema[]{FETCH_REQUEST_V0, FETCH_REQUEST_V1, FETCH_REQUEST_V2, FETCH_REQUEST_V3, FETCH_REQUEST_V4,
             FETCH_REQUEST_V5, FETCH_REQUEST_V6, FETCH_REQUEST_V7, FETCH_REQUEST_V8, FETCH_REQUEST_V9,
-            FETCH_REQUEST_V10};
+            FETCH_REQUEST_V10, FETCH_REQUEST_V11};
     }
 
     // default values for older versions where a request level limit did not exist
@@ -217,6 +230,7 @@ public class FetchRequest extends AbstractRequest {
 
     private final List<TopicPartition> toForget;
     private final FetchMetadata metadata;
+    private final String rackId;
 
     public static final class PartitionData {
         public final long fetchOffset;
@@ -290,6 +304,7 @@ public class FetchRequest extends AbstractRequest {
         private int maxBytes = DEFAULT_RESPONSE_MAX_BYTES;
         private FetchMetadata metadata = FetchMetadata.LEGACY;
         private List<TopicPartition> toForget = Collections.emptyList();
+        private String rackId = "";
 
         public static Builder forConsumer(int maxWait, int minBytes, Map<TopicPartition, PartitionData> fetchData) {
             return new Builder(ApiKeys.FETCH.oldestVersion(), ApiKeys.FETCH.latestVersion(),
@@ -320,6 +335,11 @@ public class FetchRequest extends AbstractRequest {
             return this;
         }
 
+        public Builder rackId(String rackId) {
+            this.rackId = rackId;
+            return this;
+        }
+
         public Map<TopicPartition, PartitionData> fetchData() {
             return this.fetchData;
         }
@@ -345,7 +365,7 @@ public class FetchRequest extends AbstractRequest {
             }
 
             return new FetchRequest(version, replicaId, maxWait, minBytes, maxBytes, fetchData,
-                isolationLevel, toForget, metadata);
+                isolationLevel, toForget, metadata, rackId);
         }
 
         @Override
@@ -360,6 +380,7 @@ public class FetchRequest extends AbstractRequest {
                     append(", isolationLevel=").append(isolationLevel).
                     append(", toForget=").append(Utils.join(toForget, ", ")).
                     append(", metadata=").append(metadata).
+                    append(", rackId=").append(rackId).
                     append(")");
             return bld.toString();
         }
@@ -367,7 +388,7 @@ public class FetchRequest extends AbstractRequest {
 
     private FetchRequest(short version, int replicaId, int maxWait, int minBytes, int maxBytes,
                          Map<TopicPartition, PartitionData> fetchData, IsolationLevel isolationLevel,
-                         List<TopicPartition> toForget, FetchMetadata metadata) {
+                         List<TopicPartition> toForget, FetchMetadata metadata, String rackId) {
         super(ApiKeys.FETCH, version);
         this.replicaId = replicaId;
         this.maxWait = maxWait;
@@ -377,6 +398,7 @@ public class FetchRequest extends AbstractRequest {
         this.isolationLevel = isolationLevel;
         this.toForget = toForget;
         this.metadata = metadata;
+        this.rackId = rackId;
     }
 
     public FetchRequest(Struct struct, short version) {
@@ -421,6 +443,7 @@ public class FetchRequest extends AbstractRequest {
                 fetchData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
+        rackId = struct.getOrElse(RACK_ID, "");
     }
 
     @Override
@@ -436,7 +459,7 @@ public class FetchRequest extends AbstractRequest {
         for (Map.Entry<TopicPartition, PartitionData> entry : fetchData.entrySet()) {
             FetchResponse.PartitionData<MemoryRecords> partitionResponse = new FetchResponse.PartitionData<>(error,
                 FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY);
+                FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY);
             responseData.put(entry.getKey(), partitionResponse);
         }
         return new FetchResponse<>(error, responseData, throttleTimeMs, metadata.sessionId());
@@ -478,6 +501,10 @@ public class FetchRequest extends AbstractRequest {
         return metadata;
     }
 
+    public String rackId() {
+        return rackId;
+    }
+
     public static FetchRequest parse(ByteBuffer buffer, short version) {
         return new FetchRequest(ApiKeys.FETCH.parseRequest(version, buffer), version);
     }
@@ -530,6 +557,7 @@ public class FetchRequest extends AbstractRequest {
             }
             struct.set(FORGOTTEN_TOPICS, toForgetStructs.toArray());
         }
+        struct.setIfExists(RACK_ID, rackId);
         return struct;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 3191f42..e857b5b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -38,6 +38,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Optional;
 import java.util.Queue;
 
 import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
@@ -78,6 +79,8 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             "Last committed offset.");
     private static final Field.Int64 LOG_START_OFFSET = new Field.Int64("log_start_offset",
             "Earliest available offset.");
+    private static final Field.Int32 PREFERRED_READ_REPLICA = new Field.Int32("preferred_read_replica",
+            "The ID of the replica that the consumer should prefer.");
 
     private static final String PARTITION_HEADER_KEY_NAME = "partition_header";
     private static final String ABORTED_TRANSACTIONS_KEY_NAME = "aborted_transactions";
@@ -140,6 +143,15 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             LOG_START_OFFSET,
             new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)));
 
+    private static final Schema FETCH_RESPONSE_PARTITION_HEADER_V6 = new Schema(
+            PARTITION_ID,
+            ERROR_CODE,
+            HIGH_WATERMARK,
+            LAST_STABLE_OFFSET,
+            LOG_START_OFFSET,
+            new Field(ABORTED_TRANSACTIONS_KEY_NAME, ArrayOf.nullable(FETCH_RESPONSE_ABORTED_TRANSACTION_V4)),
+            PREFERRED_READ_REPLICA);
+
     private static final Schema FETCH_RESPONSE_PARTITION_V4 = new Schema(
             new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V4),
             new Field(RECORD_SET_KEY_NAME, RECORDS));
@@ -148,6 +160,10 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V5),
             new Field(RECORD_SET_KEY_NAME, RECORDS));
 
+    private static final Schema FETCH_RESPONSE_PARTITION_V6 = new Schema(
+            new Field(PARTITION_HEADER_KEY_NAME, FETCH_RESPONSE_PARTITION_HEADER_V6),
+            new Field(RECORD_SET_KEY_NAME, RECORDS));
+
     private static final Schema FETCH_RESPONSE_TOPIC_V4 = new Schema(
             TOPIC_NAME,
             new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V4)));
@@ -156,6 +172,10 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             TOPIC_NAME,
             new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V5)));
 
+    private static final Schema FETCH_RESPONSE_TOPIC_V6 = new Schema(
+            TOPIC_NAME,
+            new Field(PARTITIONS_KEY_NAME, new ArrayOf(FETCH_RESPONSE_PARTITION_V6)));
+
     private static final Schema FETCH_RESPONSE_V4 = new Schema(
             THROTTLE_TIME_MS,
             new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V4)));
@@ -186,15 +206,24 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
     // V10 bumped up to indicate ZStandard capability. (see KIP-110)
     private static final Schema FETCH_RESPONSE_V10 = FETCH_RESPONSE_V9;
 
+    private static final Schema FETCH_RESPONSE_V11 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            SESSION_ID,
+            new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V6)));
+
+
     public static Schema[] schemaVersions() {
         return new Schema[] {FETCH_RESPONSE_V0, FETCH_RESPONSE_V1, FETCH_RESPONSE_V2,
             FETCH_RESPONSE_V3, FETCH_RESPONSE_V4, FETCH_RESPONSE_V5, FETCH_RESPONSE_V6,
-            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10};
+            FETCH_RESPONSE_V7, FETCH_RESPONSE_V8, FETCH_RESPONSE_V9, FETCH_RESPONSE_V10,
+            FETCH_RESPONSE_V11};
     }
 
     public static final long INVALID_HIGHWATERMARK = -1L;
     public static final long INVALID_LAST_STABLE_OFFSET = -1L;
     public static final long INVALID_LOG_START_OFFSET = -1L;
+    public static final int UNSPECIFIED_PREFERRED_REPLICA = -1;
 
     private final int throttleTimeMs;
     private final Errors error;
@@ -240,6 +269,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
         public final long highWatermark;
         public final long lastStableOffset;
         public final long logStartOffset;
+        public final Optional<Integer> preferredReadReplica;
         public final List<AbortedTransaction> abortedTransactions;
         public final T records;
 
@@ -247,12 +277,29 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                              long highWatermark,
                              long lastStableOffset,
                              long logStartOffset,
+                             Integer preferredReadReplica,
+                             List<AbortedTransaction> abortedTransactions,
+                             T records) {
+            this.error = error;
+            this.highWatermark = highWatermark;
+            this.lastStableOffset = lastStableOffset;
+            this.logStartOffset = logStartOffset;
+            this.preferredReadReplica = Optional.ofNullable(preferredReadReplica);
+            this.abortedTransactions = abortedTransactions;
+            this.records = records;
+        }
+
+        public PartitionData(Errors error,
+                             long highWatermark,
+                             long lastStableOffset,
+                             long logStartOffset,
                              List<AbortedTransaction> abortedTransactions,
                              T records) {
             this.error = error;
             this.highWatermark = highWatermark;
             this.lastStableOffset = lastStableOffset;
             this.logStartOffset = logStartOffset;
+            this.preferredReadReplica = Optional.empty();
             this.abortedTransactions = abortedTransactions;
             this.records = records;
         }
@@ -270,6 +317,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                     highWatermark == that.highWatermark &&
                     lastStableOffset == that.lastStableOffset &&
                     logStartOffset == that.logStartOffset &&
+                    Objects.equals(preferredReadReplica, that.preferredReadReplica) &&
                     Objects.equals(abortedTransactions, that.abortedTransactions) &&
                     Objects.equals(records, that.records);
         }
@@ -280,6 +328,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             result = 31 * result + Long.hashCode(highWatermark);
             result = 31 * result + Long.hashCode(lastStableOffset);
             result = 31 * result + Long.hashCode(logStartOffset);
+            result = 31 * result + (preferredReadReplica != null ? preferredReadReplica.hashCode() : 0);
             result = 31 * result + (abortedTransactions != null ? abortedTransactions.hashCode() : 0);
             result = 31 * result + (records != null ? records.hashCode() : 0);
             return result;
@@ -291,6 +340,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                     ", highWaterMark=" + highWatermark +
                     ", lastStableOffset = " + lastStableOffset +
                     ", logStartOffset = " + logStartOffset +
+                    ", preferredReadReplica = " + preferredReadReplica.map(Object::toString).orElse("absent") +
                     ", abortedTransactions = " + abortedTransactions +
                     ", recordsSizeInBytes=" + records.sizeInBytes() + ")";
         }
@@ -329,6 +379,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK);
                 long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET);
                 long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
+                int preferredReadReplica = partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, UNSPECIFIED_PREFERRED_REPLICA);
 
                 BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME);
                 if (!(baseRecords instanceof MemoryRecords))
@@ -350,7 +401,8 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 }
 
                 PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, abortedTransactions, records);
+                        logStartOffset, preferredReadReplica == UNSPECIFIED_PREFERRED_REPLICA ? null : preferredReadReplica,
+                        abortedTransactions, records);
                 responseData.put(new TopicPartition(topic, partition), partitionData);
             }
         }
@@ -513,6 +565,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                     }
                 }
                 partitionDataHeader.setIfExists(LOG_START_OFFSET, fetchPartitionData.logStartOffset);
+                partitionDataHeader.setIfExists(PREFERRED_READ_REPLICA, fetchPartitionData.preferredReadReplica.orElse(-1));
                 partitionData.set(PARTITION_HEADER_KEY_NAME, partitionDataHeader);
                 partitionData.set(RECORD_SET_KEY_NAME, fetchPartitionData.records);
                 partitionArray.add(partitionData);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
index 75788a0..5052b0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest.java
@@ -36,6 +36,15 @@ import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
 import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
 
 public class OffsetsForLeaderEpochRequest extends AbstractRequest {
+    private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
+            "Broker id of the follower. For normal consumers, use -1.");
+
+    /**
+     * Sentinel replica_id value to indicate a regular consumer rather than another broker
+     */
+    public static final int CONSUMER_REPLICA_ID = -1;
+
+
     private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics",
             "An array of topics to get epochs for");
     private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions",
@@ -67,28 +76,40 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V2 = new Schema(
             TOPICS_V2);
 
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_REQUEST_V3 = new Schema(
+            REPLICA_ID,
+            TOPICS_V2);
+
     public static Schema[] schemaVersions() {
         return new Schema[]{OFFSET_FOR_LEADER_EPOCH_REQUEST_V0, OFFSET_FOR_LEADER_EPOCH_REQUEST_V1,
-            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2};
+            OFFSET_FOR_LEADER_EPOCH_REQUEST_V2, OFFSET_FOR_LEADER_EPOCH_REQUEST_V3};
     }
 
-    private Map<TopicPartition, PartitionData> epochsByPartition;
+    private final Map<TopicPartition, PartitionData> epochsByPartition;
+
+    private final int replicaId;
 
     public Map<TopicPartition, PartitionData> epochsByTopicPartition() {
         return epochsByPartition;
     }
 
+    public int replicaId() {
+        return replicaId;
+    }
+
     public static class Builder extends AbstractRequest.Builder<OffsetsForLeaderEpochRequest> {
         private final Map<TopicPartition, PartitionData> epochsByPartition;
+        private final int replicaId;
 
         public Builder(short version, Map<TopicPartition, PartitionData> epochsByPartition) {
             super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
             this.epochsByPartition = epochsByPartition;
+            this.replicaId = CONSUMER_REPLICA_ID;
         }
 
         @Override
         public OffsetsForLeaderEpochRequest build(short version) {
-            return new OffsetsForLeaderEpochRequest(epochsByPartition, version);
+            return new OffsetsForLeaderEpochRequest(epochsByPartition, replicaId, version);
         }
 
         public static OffsetsForLeaderEpochRequest parse(ByteBuffer buffer, short version) {
@@ -105,13 +126,15 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
         }
     }
 
-    public OffsetsForLeaderEpochRequest(Map<TopicPartition, PartitionData> epochsByPartition, short version) {
+    public OffsetsForLeaderEpochRequest(Map<TopicPartition, PartitionData> epochsByPartition, int replicaId, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
         this.epochsByPartition = epochsByPartition;
+        this.replicaId = replicaId;
     }
 
     public OffsetsForLeaderEpochRequest(Struct struct, short version) {
         super(ApiKeys.OFFSET_FOR_LEADER_EPOCH, version);
+        replicaId = struct.getOrElse(REPLICA_ID, CONSUMER_REPLICA_ID);
         epochsByPartition = new HashMap<>();
         for (Object topicAndEpochsObj : struct.get(TOPICS)) {
             Struct topicAndEpochs = (Struct) topicAndEpochsObj;
@@ -134,6 +157,7 @@ public class OffsetsForLeaderEpochRequest extends AbstractRequest {
     @Override
     protected Struct toStruct() {
         Struct requestStruct = new Struct(ApiKeys.OFFSET_FOR_LEADER_EPOCH.requestSchema(version()));
+        requestStruct.set(REPLICA_ID, replicaId);
 
         Map<String, Map<Integer, PartitionData>> topicsToPartitionEpochs = CollectionUtils.groupPartitionDataByTopic(epochsByPartition);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index d5d1265..3fe3cdc 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -85,9 +85,12 @@ public class OffsetsForLeaderEpochResponse extends AbstractResponse {
             THROTTLE_TIME_MS,
             TOPICS_V1);
 
+    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2;
+
+
     public static Schema[] schemaVersions() {
         return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
-            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2};
+            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3};
     }
 
     private final int throttleTimeMs;
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 24c974d..5b834b8 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -44,7 +44,7 @@
   // Version 10 indicates that we can use the ZStd compression algorithm, as
   // described in KIP-110.
   //
-  "validVersions": "0-10",
+  "validVersions": "0-11",
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+",
       "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
@@ -84,6 +84,8 @@
         "about": "The partition name." },
       { "name": "ForgottenPartitionIndexes", "type": "[]int32", "versions": "7+",
         "about": "The partitions indexes to forget." }
-    ]}
+    ]},
+    { "name": "RackId", "type":  "string", "versions": "11+", "default": "", "ignorable": true,
+      "about": "Rack ID of the consumer making this request"}
   ]
 }
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index 5ebc97c..f6cc582 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -38,7 +38,7 @@
   // Version 10 indicates that the response data can use the ZStd compression
   // algorithm, as described in KIP-110.
   //
-  "validVersions": "0-10",
+  "validVersions": "0-11",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -69,6 +69,8 @@
           { "name": "FirstOffset", "type": "int64", "versions": "4+",
             "about": "The first offset in the aborted transaction." }
         ]},
+        { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "ignorable": true,
+          "about": "The preferred read replica for the consumer to use on its next fetch request"},
         { "name": "Records", "type": "bytes", "versions": "0+", "nullableVersions": "0+",
           "about": "The record data." }
       ]}
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
index 4104938..84585ca 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochRequest.json
@@ -20,8 +20,11 @@
   // Version 1 is the same as version 0.
   //
   // Version 2 adds the current leader epoch to support fencing.
-  "validVersions": "0-2",
+  // Version 3 adds ReplicaId
+  "validVersions": "0-3",
   "fields": [
+    { "name": "ReplicaId", "type": "int32", "versions": "3+",
+      "about": "The broker ID of the follower, of -1 if this request is from a consumer." },
     { "name": "Topics", "type": "[]OffsetForLeaderTopic", "versions": "0+",
       "about": "Each topic to get offsets for.", "fields": [
       { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
diff --git a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
index 8e93422..1694606 100644
--- a/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
+++ b/clients/src/main/resources/common/message/OffsetForLeaderEpochResponse.json
@@ -19,7 +19,7 @@
   "name": "OffsetForLeaderEpochResponse",
   // Version 1 added the leader epoch to the response.
   // Version 2 added the throttle time.
-  "validVersions": "0-2",
+  "validVersions": "0-3",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
       "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
diff --git a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
index 0e3d191..a0d0819 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MetadataTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
@@ -36,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.test.TestUtils.assertOptional;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -437,4 +439,25 @@ public class MetadataTest {
         assertNull(metadata.getAndClearMetadataException());
     }
 
+    @Test
+    public void testNodeIfOffline() {
+        Map<String, Integer> partitionCounts = new HashMap<>();
+        partitionCounts.put("topic-1", 1);
+        Node node0 = new Node(0, "localhost", 9092);
+        Node node1 = new Node(1, "localhost", 9093);
+
+        MetadataResponse metadataResponse = TestUtils.metadataUpdateWith("dummy", 2, Collections.emptyMap(), partitionCounts, _tp -> 99,
+            (error, partition, leader, leaderEpoch, replicas, isr, offlineReplicas) ->
+                new MetadataResponse.PartitionMetadata(error, partition, node0, leaderEpoch,
+                    Collections.singletonList(node0), Collections.emptyList(), Collections.singletonList(node1)));
+        metadata.update(emptyMetadataResponse(), 0L);
+        metadata.update(metadataResponse, 10L);
+
+        TopicPartition tp = new TopicPartition("topic-1", 0);
+
+        assertOptional(metadata.fetch().nodeIfOnline(tp, 0), node -> assertEquals(node.id(), 0));
+        assertFalse(metadata.fetch().nodeIfOnline(tp, 1).isPresent());
+        assertEquals(metadata.fetch().nodeById(0).id(), 0);
+        assertEquals(metadata.fetch().nodeById(1).id(), 1);
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 606b711..9012ea2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1913,6 +1913,7 @@ public class KafkaConsumerTest {
                 fetchSize,
                 maxPollRecords,
                 checkCrcs,
+                "",
                 keyDeserializer,
                 valueDeserializer,
                 metadata,
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 30c9a04..1dfdf23 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -78,6 +78,7 @@ import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
 import org.apache.kafka.common.requests.ResponseHeader;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.BytesDeserializer;
 import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.ByteBufferOutputStream;
@@ -96,6 +97,7 @@ import java.io.DataOutputStream;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -1171,7 +1173,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = new HashMap<>();
         partitions.put(tp0, new FetchResponse.PartitionData<>(Errors.NONE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
         client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L, 0));
         consumerClient.poll(time.timer(0));
 
@@ -1182,7 +1184,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         partitions = new HashMap<>();
         partitions.put(tp1, new FetchResponse.PartitionData<>(Errors.OFFSET_OUT_OF_RANGE, 100,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, null, MemoryRecords.EMPTY));
         client.prepareResponse(new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), 0, INVALID_SESSION_ID));
         consumerClient.poll(time.timer(0));
         assertEquals(1, fetcher.fetchedRecords().get(tp0).size());
@@ -1767,6 +1769,7 @@ public class FetcherTest {
         for (int i = 1; i <= 3; i++) {
             int throttleTimeMs = 100 * i;
             FetchRequest.Builder builder = FetchRequest.Builder.forConsumer(100, 100, new LinkedHashMap<>());
+            builder.rackId("");
             ClientRequest request = client.newClientRequest(node.idString(), builder, time.milliseconds(), true);
             client.send(request, time.milliseconds());
             client.poll(1, time.milliseconds());
@@ -2812,7 +2815,7 @@ public class FetcherTest {
         for (int i = 0; i < numPartitions; i++)
             topicPartitions.add(new TopicPartition(topicName, i));
 
-        buildDependencies(new MetricConfig(), OffsetResetStrategy.EARLIEST);
+        buildDependencies(new MetricConfig(), OffsetResetStrategy.EARLIEST, Long.MAX_VALUE);
 
         fetcher = new Fetcher<byte[], byte[]>(
                 new LogContext(),
@@ -2823,6 +2826,7 @@ public class FetcherTest {
                 fetchSize,
                 2 * numPartitions,
                 true,
+                "",
                 new ByteArrayDeserializer(),
                 new ByteArrayDeserializer(),
                 metadata,
@@ -3263,6 +3267,86 @@ public class FetcherTest {
         assertOptional(subscriptions.position(tp0).offsetEpoch, value -> assertEquals(value.intValue(), 1));
     }
 
+    @Test
+    public void testPreferredReadReplica() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4)));
+        subscriptions.seek(tp0, 0);
+
+        // Node preferred replica before first fetch response
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(selected.id(), -1);
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=1
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> partitionRecords = fetchedRecords();
+        assertTrue(partitionRecords.containsKey(tp0));
+
+        // verify
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(selected.id(), 1);
+
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        // Set preferred read replica to node=2, which isn't in our metadata, should revert to leader
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 2));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+        fetchedRecords();
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(selected.id(), -1);
+    }
+
+    @Test
+    public void testPreferredReadReplicaOffsetError() {
+        buildFetcher(new MetricConfig(), OffsetResetStrategy.EARLIEST, new BytesDeserializer(), new BytesDeserializer(),
+                Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED, Duration.ofMinutes(5).toMillis());
+
+        subscriptions.assignFromUser(singleton(tp0));
+        client.updateMetadata(TestUtils.metadataUpdateWith(2, singletonMap(topicName, 4)));
+        subscriptions.seek(tp0, 0);
+
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NONE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, 1));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        Node selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(selected.id(), 1);
+
+        // Return an error, should unset the preferred read replica
+        assertEquals(1, fetcher.sendFetches());
+        assertFalse(fetcher.hasCompletedFetches());
+
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.OFFSET_OUT_OF_RANGE, 100L,
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, 0, null));
+        consumerClient.poll(time.timer(0));
+        assertTrue(fetcher.hasCompletedFetches());
+
+        fetchedRecords();
+
+        selected = fetcher.selectReadReplica(tp0, Node.noNode(), time.milliseconds());
+        assertEquals(selected.id(), -1);
+    }
+
     private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
         // matches any list offset request with the provided timestamp
         return new MockClient.RequestMatcher() {
@@ -3308,6 +3392,14 @@ public class FetcherTest {
         return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
     }
 
+    private FetchResponse<MemoryRecords> fullFetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
+                                                           long lastStableOffset, int throttleTime, Integer preferredReplicaId) {
+        Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
+                new FetchResponse.PartitionData<>(error, hw, lastStableOffset, 0L,
+                        preferredReplicaId, null, records));
+        return new FetchResponse<>(Errors.NONE, new LinkedHashMap<>(partitions), throttleTime, INVALID_SESSION_ID);
+    }
+
     private FetchResponse<MemoryRecords> fetchResponse(TopicPartition tp, MemoryRecords records, Errors error, long hw,
                                         long lastStableOffset, long logStartOffset, int throttleTime) {
         Map<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> partitions = Collections.singletonMap(tp,
@@ -3369,7 +3461,17 @@ public class FetcherTest {
                                      Deserializer<V> valueDeserializer,
                                      int maxPollRecords,
                                      IsolationLevel isolationLevel) {
-        buildDependencies(metricConfig, offsetResetStrategy);
+        buildFetcher(metricConfig, offsetResetStrategy, keyDeserializer, valueDeserializer, maxPollRecords, isolationLevel, Long.MAX_VALUE);
+    }
+
+    private <K, V> void buildFetcher(MetricConfig metricConfig,
+                                     OffsetResetStrategy offsetResetStrategy,
+                                     Deserializer<K> keyDeserializer,
+                                     Deserializer<V> valueDeserializer,
+                                     int maxPollRecords,
+                                     IsolationLevel isolationLevel,
+                                     long metadataExpireMs) {
+        buildDependencies(metricConfig, offsetResetStrategy, metadataExpireMs);
         fetcher = new Fetcher<>(
                 new LogContext(),
                 consumerClient,
@@ -3379,6 +3481,7 @@ public class FetcherTest {
                 fetchSize,
                 maxPollRecords,
                 true, // check crc
+                "",
                 keyDeserializer,
                 valueDeserializer,
                 metadata,
@@ -3391,11 +3494,12 @@ public class FetcherTest {
                 isolationLevel);
     }
 
-    private void buildDependencies(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy) {
+
+    private void buildDependencies(MetricConfig metricConfig, OffsetResetStrategy offsetResetStrategy, long metadataExpireMs) {
         LogContext logContext = new LogContext();
         time = new MockTime(1);
         subscriptions = new SubscriptionState(logContext, offsetResetStrategy);
-        metadata = new ConsumerMetadata(0, Long.MAX_VALUE, false, false,
+        metadata = new ConsumerMetadata(0, metadataExpireMs, false, false,
                 subscriptions, logContext, new ClusterResourceListeners());
         client = new MockClient(time, metadata);
         metrics = new Metrics(metricConfig, time);
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
index 701866d..528c2b9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -309,6 +310,36 @@ public class SubscriptionStateTest {
         assertEquals(0, state.numAssignedPartitions());
     }
 
+    @Test
+    public void testPreferredReadReplicaLease() {
+        state.assignFromUser(Collections.singleton(tp0));
+
+        // Default state
+        assertFalse(state.preferredReadReplica(tp0, 0L).isPresent());
+
+        // Set the preferred replica with lease
+        state.updatePreferredReadReplica(tp0, 42, () -> 10L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 9L),  value -> assertEquals(value.intValue(), 42));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 10L),  value -> assertEquals(value.intValue(), 42));
+        assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
+
+        // Unset the preferred replica
+        state.clearPreferredReadReplica(tp0);
+        assertFalse(state.preferredReadReplica(tp0, 9L).isPresent());
+        assertFalse(state.preferredReadReplica(tp0, 11L).isPresent());
+
+        // Set to new preferred replica with lease
+        state.updatePreferredReadReplica(tp0, 43, () -> 20L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 11L),  value -> assertEquals(value.intValue(), 43));
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 20L),  value -> assertEquals(value.intValue(), 43));
+        assertFalse(state.preferredReadReplica(tp0, 21L).isPresent());
+
+        // Set to new preferred replica without clearing first
+        state.updatePreferredReadReplica(tp0, 44, () -> 30L);
+        TestUtils.assertOptional(state.preferredReadReplica(tp0, 30L),  value -> assertEquals(value.intValue(), 44));
+        assertFalse(state.preferredReadReplica(tp0, 31L).isPresent());
+    }
+
     private static class MockRebalanceListener implements ConsumerRebalanceListener {
         public Collection<TopicPartition> revoked;
         public Collection<TopicPartition> assigned;
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index d9a9ed1..85ea49c 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -559,7 +559,7 @@ public class RequestResponseTest {
         MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(
                 Errors.NONE, 1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET,
-                0L, null, records));
+                0L, null, null, records));
 
         FetchResponse<MemoryRecords> v0Response = new FetchResponse<>(Errors.NONE, responseData, 0, INVALID_SESSION_ID);
         FetchResponse<MemoryRecords> v1Response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
@@ -583,11 +583,11 @@ public class RequestResponseTest {
                 new FetchResponse.AbortedTransaction(15, 50)
         );
         responseData.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData<>(Errors.NONE, 100000,
-                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, abortedTransactions, records));
+                FetchResponse.INVALID_LAST_STABLE_OFFSET, FetchResponse.INVALID_LOG_START_OFFSET, null, abortedTransactions, records));
         responseData.put(new TopicPartition("bar", 1), new FetchResponse.PartitionData<>(Errors.NONE, 900000,
-                5, FetchResponse.INVALID_LOG_START_OFFSET, null, records));
+                5, FetchResponse.INVALID_LOG_START_OFFSET, null, null, records));
         responseData.put(new TopicPartition("foo", 0), new FetchResponse.PartitionData<>(Errors.NONE, 70000,
-                6, FetchResponse.INVALID_LOG_START_OFFSET, Collections.emptyList(), records));
+                6, FetchResponse.INVALID_LOG_START_OFFSET, null, Collections.emptyList(), records));
 
         FetchResponse<MemoryRecords> response = new FetchResponse<>(Errors.NONE, responseData, 10, INVALID_SESSION_ID);
         FetchResponse deserialized = FetchResponse.parse(toBuffer(response.toStruct((short) 4)), (short) 4);
@@ -751,11 +751,11 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
             new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, abortedTransactions, MemoryRecords.EMPTY));
+            1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
         return new FetchResponse<>(Errors.NONE, responseData, 25, sessionId);
     }
 
@@ -763,12 +763,12 @@ public class RequestResponseTest {
         LinkedHashMap<TopicPartition, FetchResponse.PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>();
         MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
         responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, records));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, null, records));
 
         List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
                 new FetchResponse.AbortedTransaction(234L, 999L));
         responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData<>(Errors.NONE,
-                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, abortedTransactions, MemoryRecords.EMPTY));
+                1000000, FetchResponse.INVALID_LAST_STABLE_OFFSET, 0L, null, abortedTransactions, MemoryRecords.EMPTY));
 
         return new FetchResponse<>(Errors.NONE, responseData, 25, INVALID_SESSION_ID);
     }
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index d8d1edb..8e8ca0b 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -88,7 +88,9 @@ object ApiVersion {
     // New error code for ListOffsets when a new leader is lagging behind former HW (KIP-207)
     KAFKA_2_2_IV1,
     // Introduced static membership.
-    KAFKA_2_3_IV0
+    KAFKA_2_3_IV0,
+    // Add rack_id to FetchRequest, preferred_read_replica to FetchResponse, and replica_id to OffsetsForLeaderRequest
+    KAFKA_2_3_IV1
   )
 
   // Map keys are the union of the short and full versions
@@ -307,6 +309,13 @@ case object KAFKA_2_3_IV0 extends DefaultApiVersion {
   val id: Int = 22
 }
 
+case object KAFKA_2_3_IV1 extends DefaultApiVersion {
+  val shortVersion: String = "2.3"
+  val subVersion = "IV1"
+  val recordVersion = RecordVersion.V2
+  val id: Int = 23
+}
+
 object ApiVersionValidator extends Validator {
 
   override def ensureValid(name: String, value: Any): Unit = {
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 82f51e6..8e92c2b 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -64,7 +64,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val fetchRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 11
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV2) 10
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV1) 8
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_1_1_IV0) 7
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_11_0_IV1) 5
@@ -76,7 +77,8 @@ class ReplicaFetcherThread(name: String,
 
   // Visible for testing
   private[server] val offsetForLeaderEpochRequestVersion: Short =
-    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 2
+    if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_3_IV1) 3
+    else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_1_IV1) 2
     else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_2_0_IV0) 1
     else 0
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 3d7e86a..d6ebdd6 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -412,7 +412,7 @@ class ReplicaFetcherThreadTest {
     assertEquals(2, mockNetwork.epochFetchCount)
     assertEquals(1, mockNetwork.fetchCount)
     assertEquals("OffsetsForLeaderEpochRequest version.",
-      2, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
+      3, mockNetwork.lastUsedOffsetForLeaderEpochVersion)
 
     //Loop 3 we should not fetch epochs
     thread.doWork()