You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/07/17 19:59:12 UTC

[kafka] branch 2.6 updated: KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (#8979)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new f221a2e  KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (#8979)
f221a2e is described below

commit f221a2e59ebb6c0deccb35cbcf92576954803231
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 17 20:05:11 2020 +0100

    KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (#8979)
    
    Brokers currently return NOT_LEADER_FOR_PARTITION to producers and REPLICA_NOT_AVAILABLE to consumers if a replica is not available on the broker during reassignments. Non-Java clients treat REPLICA_NOT_AVAILABLE as a non-retriable exception, Java consumers handle this error by explicitly matching the error code even though it is not an InvalidMetadataException. This PR renames NOT_LEADER_FOR_PARTITION to NOT_LEADER_OR_FOLLOWER and uses the same error for producers and consumers. This [...]
        - ALTER_REPLICA_LOG_DIRS continues to return REPLICA_NOT_AVAILABLE. Retained this for compatibility since this request never returned NOT_LEADER_FOR_PARTITION earlier.
       -  MetadataRequest version 0 also returns REPLICA_NOT_AVAILABLE as topic-level error code for compatibility. Newer versions filter these out and return Errors.NONE, so didn't change this.
       - Partition responses in MetadataRequest return REPLICA_NOT_AVAILABLE to indicate that one of the replicas is not available. Did not change this since NOT_LEADER_FOR_PARTITION is not suitable in this case.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Bob Barrett <bo...@confluent.io>
---
 .../kafka/clients/consumer/internals/Fetcher.java  |  4 +-
 .../internals/OffsetsForLeaderEpochClient.java     |  2 +-
 .../errors/NotLeaderForPartitionException.java     |  4 +-
 ...tion.java => NotLeaderOrFollowerException.java} | 18 ++++---
 .../errors/ReplicaNotAvailableException.java       |  8 +++-
 .../org/apache/kafka/common/protocol/Errors.java   | 10 ++--
 .../common/requests/DeleteRecordsResponse.java     |  2 +-
 .../apache/kafka/common/requests/FetchRequest.java |  2 +-
 .../kafka/common/requests/FetchResponse.java       | 10 ++--
 .../kafka/common/requests/ListOffsetResponse.java  |  4 +-
 .../requests/OffsetsForLeaderEpochResponse.java    |  4 +-
 .../kafka/common/requests/ProduceRequest.java      |  2 +-
 .../kafka/common/requests/ProduceResponse.java     |  8 ++--
 .../common/requests/WriteTxnMarkersResponse.java   |  2 +-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 12 ++---
 .../kafka/clients/consumer/KafkaConsumerTest.java  |  2 +-
 .../clients/consumer/internals/FetcherTest.java    | 20 ++++----
 .../clients/producer/internals/SenderTest.java     | 20 ++++----
 .../producer/internals/TransactionManagerTest.java | 12 ++---
 .../common/requests/LeaderAndIsrResponseTest.java  |  2 +-
 .../common/requests/StopReplicaResponseTest.java   |  2 +-
 core/src/main/scala/kafka/cluster/Partition.scala  | 21 ++++----
 .../coordinator/group/GroupMetadataManager.scala   |  4 +-
 ...TransactionMarkerRequestCompletionHandler.scala |  2 +-
 .../transaction/TransactionStateManager.scala      |  2 +-
 .../scala/kafka/server/AbstractFetcherThread.scala |  4 +-
 .../scala/kafka/server/DelayedDeleteRecords.scala  |  2 +-
 .../src/main/scala/kafka/server/DelayedFetch.scala | 10 ++--
 .../main/scala/kafka/server/DelayedProduce.scala   |  2 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |  6 +--
 .../kafka/server/ReplicaAlterLogDirsThread.scala   | 10 ++--
 .../main/scala/kafka/server/ReplicaManager.scala   | 56 +++++++++++-----------
 .../kafka/api/PlaintextAdminIntegrationTest.scala  |  2 +-
 .../kafka/api/ProducerFailureHandlingTest.scala    |  2 +-
 .../kafka/server/DelayedFetchTest.scala            | 12 ++---
 .../scala/unit/kafka/cluster/PartitionTest.scala   | 22 ++++-----
 .../group/GroupMetadataManagerTest.scala           |  4 +-
 ...sactionMarkerRequestCompletionHandlerTest.scala |  4 +-
 .../transaction/TransactionStateManagerTest.scala  |  2 +-
 .../scala/unit/kafka/server/FetchRequestTest.scala |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  4 +-
 .../unit/kafka/server/ListOffsetsRequestTest.scala | 14 +++---
 .../unit/kafka/server/LogDirFailureTest.scala      | 10 ++--
 .../server/OffsetsForLeaderEpochRequestTest.scala  |  8 ++--
 .../unit/kafka/server/ProduceRequestTest.scala     |  2 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 24 +++++-----
 .../kafka/server/ReplicaFetcherThreadTest.scala    |  2 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  3 +-
 .../unit/kafka/server/ReplicaManagerTest.scala     | 54 +++++++++++++++------
 .../server/epoch/LeaderEpochIntegrationTest.scala  |  2 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  2 +-
 docs/upgrade.html                                  |  4 ++
 52 files changed, 243 insertions(+), 205 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 52c5155..3251f4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1043,7 +1043,7 @@ public class Fetcher<K, V> implements Closeable {
                     log.debug("Cannot search by timestamp for partition {} because the message format version " +
                                   "is before 0.10.0", topicPartition);
                     break;
-                case NOT_LEADER_FOR_PARTITION:
+                case NOT_LEADER_OR_FOLLOWER:
                 case REPLICA_NOT_AVAILABLE:
                 case KAFKA_STORAGE_ERROR:
                 case OFFSET_NOT_AVAILABLE:
@@ -1274,7 +1274,7 @@ public class Fetcher<K, V> implements Closeable {
                 }
 
                 nextCompletedFetch.initialized = true;
-            } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
+            } else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
                        error == Errors.REPLICA_NOT_AVAILABLE ||
                        error == Errors.KAFKA_STORAGE_ERROR ||
                        error == Errors.FENCED_LEADER_EPOCH ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index b05e01f..7b1f44a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -80,7 +80,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
                             topicPartition, epochEndOffset.endOffset(), epochEndOffset.leaderEpoch());
                     endOffsets.put(topicPartition, epochEndOffset);
                     break;
-                case NOT_LEADER_FOR_PARTITION:
+                case NOT_LEADER_OR_FOLLOWER:
                 case REPLICA_NOT_AVAILABLE:
                 case KAFKA_STORAGE_ERROR:
                 case OFFSET_NOT_AVAILABLE:
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
index 7182779..6ecce02 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
@@ -17,8 +17,10 @@
 package org.apache.kafka.common.errors;
 
 /**
- * This server is not the leader for the given partition
+ * This server is not the leader for the given partition.
+ * @deprecated since 2.7. Use {@link NotLeaderOrFollowerException}.
  */
+@Deprecated
 public class NotLeaderForPartitionException extends InvalidMetadataException {
 
     private static final long serialVersionUID = 1L;
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
similarity index 53%
copy from clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
copy to clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
index 7182779..2db960b 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
@@ -17,25 +17,31 @@
 package org.apache.kafka.common.errors;
 
 /**
- * This server is not the leader for the given partition
+ * Broker returns this error if a request could not be processed because the broker is not the leader
+ * or follower for a topic partition. This could be a transient exception during leader elections and
+ * reassignments. For `Produce` and other requests which are intended only for the leader, this exception
+ * indicates that the broker is not the current leader. For consumer `Fetch` requests which may be
+ * satisfied by a leader or follower, this exception indicates that the broker is not a replica
+ * of the topic partition.
  */
-public class NotLeaderForPartitionException extends InvalidMetadataException {
+@SuppressWarnings("deprecation")
+public class NotLeaderOrFollowerException extends NotLeaderForPartitionException {
 
     private static final long serialVersionUID = 1L;
 
-    public NotLeaderForPartitionException() {
+    public NotLeaderOrFollowerException() {
         super();
     }
 
-    public NotLeaderForPartitionException(String message) {
+    public NotLeaderOrFollowerException(String message) {
         super(message);
     }
 
-    public NotLeaderForPartitionException(Throwable cause) {
+    public NotLeaderOrFollowerException(Throwable cause) {
         super(cause);
     }
 
-    public NotLeaderForPartitionException(String message, Throwable cause) {
+    public NotLeaderOrFollowerException(String message, Throwable cause) {
         super(message, cause);
     }
 
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
index b94d400..07971cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
@@ -16,7 +16,13 @@
  */
 package org.apache.kafka.common.errors;
 
-public class ReplicaNotAvailableException extends ApiException {
+/**
+ * The replica is not available for the requested topic partition. This may be
+ * a transient exception during reassignments. From version 2.6 onwards, Fetch requests
+ * and other requests intended only for the leader or follower of the topic partition return
+ * {@link NotLeaderOrFollowerException} if the broker is a not a replica of the partition.
+ */
+public class ReplicaNotAvailableException extends InvalidMetadataException {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 526b4df..2b091f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -72,7 +72,7 @@ import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.OffsetNotAvailableException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@@ -139,13 +139,15 @@ public enum Errors {
             InvalidFetchSizeException::new),
     LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
             LeaderNotAvailableException::new),
-    NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
-            NotLeaderForPartitionException::new),
+    NOT_LEADER_OR_FOLLOWER(6, "For requests intended only for the leader, this error indicates that the broker is not the current leader. " +
+            "For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.",
+            NotLeaderOrFollowerException::new),
     REQUEST_TIMED_OUT(7, "The request timed out.",
             TimeoutException::new),
     BROKER_NOT_AVAILABLE(8, "The broker is not available.",
             BrokerNotAvailableException::new),
-    REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition.",
+    REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests " +
+            "intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.",
             ReplicaNotAvailableException::new),
     MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
             RecordTooLargeException::new),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 3266bf8..968a35b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -36,7 +36,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
      *
      * OFFSET_OUT_OF_RANGE (1)
      * UNKNOWN_TOPIC_OR_PARTITION (3)
-     * NOT_LEADER_FOR_PARTITION (6)
+     * NOT_LEADER_OR_FOLLOWER (6)
      * REQUEST_TIMED_OUT (7)
      * UNKNOWN (-1)
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 502e32b..29cbb60 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -146,7 +146,7 @@ public class FetchRequest extends AbstractRequest {
             TOPICS_V5);
 
     // V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will be
-    // translated to NotLeaderForPartitionException in the response if version <= 5
+    // translated to NotLeaderOrFollowerException in the response if version <= 5
     private static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
 
     // V7 added incremental fetch requests.
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 21edcc5..984c50c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -58,8 +58,8 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
  *
  * - {@link Errors#OFFSET_OUT_OF_RANGE} If the fetch offset is out of range for a requested partition
  * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have READ access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
  *     matches the known leader epoch on the broker or is empty
  * - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
  * - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
@@ -190,7 +190,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
             new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
 
     // V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will
-    // be translated to NotLeaderForPartitionException in the response if version <= 5
+    // be translated to NotLeaderOrFollowerException in the response if version <= 5
     private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
 
     // V7 added incremental fetch responses and a top-level error code.
@@ -549,9 +549,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
                 // If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
                 // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
                 // UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
-                // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5
+                // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if FetchRequest version <= 5
                 if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 5)
-                    errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
+                    errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
                 Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
                 Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
                 partitionDataHeader.set(PARTITION_ID, partitionEntry.getKey());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 6331941..3fe14b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -44,8 +44,8 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
  *
  * - {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} If the message format does not support lookup by timestamp
  * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have DESCRIBE access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
  *     matches the known leader epoch on the broker or is empty
  * - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
  * - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index c547c09..6573aca 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -41,8 +41,8 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
  * Possible error codes:
  *
  * - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have DESCRIBE access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
  *     matches the known leader epoch on the broker or is empty
  * - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
  * - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 485145f..210b71d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -99,7 +99,7 @@ public class ProduceRequest extends AbstractRequest {
     /**
      * The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
      * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
      */
     private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 2bb1e86..205d9ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -56,7 +56,7 @@ public class ProduceResponse extends AbstractResponse {
      *
      * {@link Errors#CORRUPT_MESSAGE}
      * {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
-     * {@link Errors#NOT_LEADER_FOR_PARTITION}
+     * {@link Errors#NOT_LEADER_OR_FOLLOWER}
      * {@link Errors#MESSAGE_TOO_LARGE}
      * {@link Errors#INVALID_TOPIC_EXCEPTION}
      * {@link Errors#RECORD_LIST_TOO_LARGE}
@@ -126,7 +126,7 @@ public class ProduceResponse extends AbstractResponse {
     /**
      * The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
      * The version number is bumped up to indicate that the client supports KafkaStorageException.
-     * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+     * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
      */
     private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
 
@@ -265,9 +265,9 @@ public class ProduceResponse extends AbstractResponse {
                 // If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code
                 // for KafkaStorageException. In this case the client library will translate KafkaStorageException to
                 // UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry
-                // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if ProduceRequest version <= 3
+                // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3
                 if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3)
-                    errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
+                    errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
                 Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
                         .set(PARTITION_ID, partitionEntry.getKey())
                         .set(ERROR_CODE, errorCode)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 783ef3b..8fdcb99 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
  *   - {@link Errors#CORRUPT_MESSAGE}
  *   - {@link Errors#INVALID_PRODUCER_EPOCH}
  *   - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
- *   - {@link Errors#NOT_LEADER_FOR_PARTITION}
+ *   - {@link Errors#NOT_LEADER_OR_FOLLOWER}
  *   - {@link Errors#MESSAGE_TOO_LARGE}
  *   - {@link Errors#RECORD_LIST_TOO_LARGE}
  *   - {@link Errors#NOT_ENOUGH_REPLICAS}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ac04e5d..aa55866 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -51,7 +51,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.LeaderNotAvailableException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -1088,7 +1088,7 @@ public class KafkaAdminClientTest {
                         new DeleteRecordsResponseData.DeleteRecordsPartitionResult()
                             .setPartitionIndex(myTopicPartition3.partition())
                             .setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
-                            .setErrorCode(Errors.NOT_LEADER_FOR_PARTITION.code()),
+                            .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()),
                         new DeleteRecordsResponseData.DeleteRecordsPartitionResult()
                             .setPartitionIndex(myTopicPartition4.partition())
                             .setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
@@ -1157,7 +1157,7 @@ public class KafkaAdminClientTest {
                 myTopicPartition3Result.get();
                 fail("get() should throw ExecutionException");
             } catch (ExecutionException e1) {
-                assertTrue(e1.getCause() instanceof NotLeaderForPartitionException);
+                assertTrue(e1.getCause() instanceof NotLeaderOrFollowerException);
             }
 
             // "unknown topic or partition" failure on records deletion for partition 4
@@ -3157,7 +3157,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
 
             Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
+            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
             responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L, Optional.of(456)));
             env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
 
@@ -3215,10 +3215,10 @@ public class KafkaAdminClientTest {
             env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
 
             Map<TopicPartition, PartitionData> responseData = new HashMap<>();
-            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
+            responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
             env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
 
-            // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_FOR_PARTITION
+            // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_OR_FOLLOWER
             final PartitionInfo newPartitionInfo = new PartitionInfo("foo", 0, node1,
                 new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
             final Cluster newCluster = new Cluster("mockClusterId", nodes, singletonList(newPartitionInfo),
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0abcdd0..9fc54bf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -591,7 +591,7 @@ public class KafkaConsumerTest {
                 return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP &&
                         timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP;
             }, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
-                        Collections.singletonMap(tp1, Errors.NOT_LEADER_FOR_PARTITION)));
+                        Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
         client.prepareResponse(
             body -> {
                 FetchRequest request = (FetchRequest) body;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index c6f379b..c82fbd7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -406,7 +406,7 @@ public class FetcherTest {
         assertEquals(1, fetcher.sendFetches());
         assertFalse(fetcher.hasCompletedFetches());
 
-        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
         consumerClient.poll(time.timer(0));
         assertTrue(fetcher.hasCompletedFetches());
 
@@ -1097,13 +1097,13 @@ public class FetcherTest {
     }
 
     @Test
-    public void testFetchNotLeaderForPartition() {
+    public void testFetchNotLeaderOrFollower() {
         buildFetcher();
         assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
 
         assertEquals(1, fetcher.sendFetches());
-        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+        client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
         consumerClient.poll(time.timer(0));
         assertEquals(0, fetcher.fetchedRecords().size());
         assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -1579,7 +1579,7 @@ public class FetcherTest {
 
         // First fetch fails with stale metadata
         client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
-            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
+            Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
         fetcher.resetOffsetsIfNeeded();
         consumerClient.pollNoWakeup();
         assertFalse(subscriptions.hasValidPosition(tp0));
@@ -2443,11 +2443,11 @@ public class FetcherTest {
         // Error code none with known offset
         testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
         // Test both of partition has error.
-        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
+        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
         // Test the second partition has error.
-        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
+        testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 100L, 10L, 100L);
         // Test different errors.
-        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
+        testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 100L, 10L, 100L);
         testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
         testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
         testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
@@ -2473,7 +2473,7 @@ public class FetcherTest {
 
     @Test
     public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
-        List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION,
+        List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER,
             Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE,
             Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
 
@@ -2520,7 +2520,7 @@ public class FetcherTest {
             // We will count the answered future response in the end to verify if this is the case.
             Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
             paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
-                Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+                Errors.NOT_LEADER_OR_FOLLOWER, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
             client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
 
             // The request to new leader must only contain one partition tp1 with error.
@@ -2689,7 +2689,7 @@ public class FetcherTest {
         buildFetcher();
 
         Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
-        partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+        partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER,
                 ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
                 Optional.empty()));
         partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3ec8c80..4dfd014 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -868,7 +868,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
         assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
 
-        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
+        client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, -1));
 
         sender.runOnce(); // receive response 0
 
@@ -1084,7 +1084,7 @@ public class SenderTest {
 
         assertEquals(2, client.inFlightRequestCount());
 
-        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
         sender.runOnce();  // receive first response
 
         Node node = metadata.fetch().nodes().get(0);
@@ -1140,7 +1140,7 @@ public class SenderTest {
 
         assertEquals(2, client.inFlightRequestCount());
 
-        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
         sender.runOnce();  // receive first response
 
         Node node = metadata.fetch().nodes().get(0);
@@ -1170,7 +1170,7 @@ public class SenderTest {
         // Send first ProduceRequest
         Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
         sender.runOnce();  // send request
-        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+        sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
 
         sender.runOnce();  // receive response
         assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
@@ -1217,7 +1217,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
 
         Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
-        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
 
@@ -1258,7 +1258,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
 
         Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
-        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
         sender.initiateClose(); // initiate close
@@ -1291,7 +1291,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
 
         Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
-        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
         sender.runOnce(); // out of order sequence error triggers producer ID reset because epoch is maxed out
@@ -1325,7 +1325,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
 
         Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
-        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+        responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
         responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
         client.respond(produceResponse(responses));
         sender.runOnce();
@@ -1336,7 +1336,7 @@ public class SenderTest {
 
         assertFalse(successfulResponse.isDone());
         // The response comes back with a retriable error.
-        client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_FOR_PARTITION, -1));
+        client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_OR_FOLLOWER, -1));
         sender.runOnce();
 
         // The response
@@ -2147,7 +2147,7 @@ public class SenderTest {
         assertEquals(1, client.inFlightRequestCount());
         time.sleep(deliverTimeoutMs);
 
-        client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
+        client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, -1)); // return a retriable error
 
         sender.runOnce();  // expire the batch
         assertTrue(request1.isDone());
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 126d447..1d14642 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2192,7 +2192,7 @@ public class TransactionManagerTest {
         Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
 
         prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
-        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, producerId, epoch);
+        prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, producerId, epoch);
         runUntil(() -> !client.hasPendingResponses());
 
         assertFalse(responseFuture.isDone());
@@ -2385,7 +2385,7 @@ public class TransactionManagerTest {
         runUntil(() -> transactionManager.transactionContainsPartition(tp0));
         assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
 
-        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, producerId, epoch);
+        prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, producerId, epoch);
         runUntil(() -> !client.hasPendingResponses());
         assertFalse(responseFuture.isDone());
 
@@ -2920,10 +2920,10 @@ public class TransactionManagerTest {
         sender.runOnce();
         assertEquals(1, accumulator.batches().get(tp1).size());
 
-        // Partition failover occurs and tp1 returns a NOT_LEADER_FOR_PARTITION error
+        // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
         // Despite having the old epoch, the batch should retry
         ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(
-                Errors.NOT_LEADER_FOR_PARTITION, -1, -1, 600L);
+                Errors.NOT_LEADER_OR_FOLLOWER, -1, -1, 600L);
         assertTrue(transactionManager.canRetry(t1b2Response, tp1b2));
         accumulator.reenqueue(tp1b2, time.milliseconds());
 
@@ -3042,10 +3042,10 @@ public class TransactionManagerTest {
         sender.runOnce();
         assertEquals(1, accumulator.batches().get(tp1).size());
 
-        // Partition failover occurs and tp1 returns a NOT_LEADER_FOR_PARTITION error
+        // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
         // Despite having the old epoch, the batch should retry
         ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(
-                Errors.NOT_LEADER_FOR_PARTITION, -1, -1, 600L);
+                Errors.NOT_LEADER_OR_FOLLOWER, -1, -1, 600L);
         assertTrue(transactionManager.canRetry(t1b2Response, tp1b2));
         accumulator.reenqueue(tp1b2, time.milliseconds());
 
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
index edb962b..bf2fe13 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
@@ -66,7 +66,7 @@ public class LeaderAndIsrResponseTest {
     @Test
     public void testErrorCountsWithTopLevelError() {
         List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
-            asList(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION));
+            asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
         LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
             .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
             .setPartitionErrors(partitions));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
index d8d8f4f..5a3af47 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
@@ -59,7 +59,7 @@ public class StopReplicaResponseTest {
         List<StopReplicaPartitionError> errors = new ArrayList<>();
         errors.add(new StopReplicaPartitionError().setTopicName("foo").setPartitionIndex(0));
         errors.add(new StopReplicaPartitionError().setTopicName("foo").setPartitionIndex(1)
-            .setErrorCode(Errors.NOT_LEADER_FOR_PARTITION.code()));
+            .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()));
         StopReplicaResponse response = new StopReplicaResponse(new StopReplicaResponseData()
             .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
             .setPartitionErrors(errors));
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ab4bb75..4be49a5 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -331,7 +331,7 @@ class Partition(val topicPartition: TopicPartition,
   def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
 
   private def getReplicaOrException(replicaId: Int): Replica = getReplica(replicaId).getOrElse{
-    throw new ReplicaNotAvailableException(s"Replica with id $replicaId is not available on broker $localBrokerId")
+    throw new NotLeaderOrFollowerException(s"Replica with id $replicaId is not available on broker $localBrokerId")
   }
 
   private def checkCurrentLeaderEpoch(remoteLeaderEpochOpt: Optional[Integer]): Errors = {
@@ -354,16 +354,13 @@ class Partition(val topicPartition: TopicPartition,
     checkCurrentLeaderEpoch(currentLeaderEpoch) match {
       case Errors.NONE =>
         if (requireLeader && !isLeader) {
-          Right(Errors.NOT_LEADER_FOR_PARTITION)
+          Right(Errors.NOT_LEADER_OR_FOLLOWER)
         } else {
           log match {
             case Some(partitionLog) =>
               Left(partitionLog)
             case _ =>
-              if (requireLeader)
-                Right(Errors.NOT_LEADER_FOR_PARTITION)
-              else
-                Right(Errors.REPLICA_NOT_AVAILABLE)
+              Right(Errors.NOT_LEADER_OR_FOLLOWER)
           }
         }
       case error =>
@@ -372,12 +369,12 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   def localLogOrException: Log = log.getOrElse {
-    throw new ReplicaNotAvailableException(s"Log for partition $topicPartition is not available " +
+    throw new NotLeaderOrFollowerException(s"Log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
   def futureLocalLogOrException: Log = futureLog.getOrElse {
-    throw new ReplicaNotAvailableException(s"Future log for partition $topicPartition is not available " +
+    throw new NotLeaderOrFollowerException(s"Future log for partition $topicPartition is not available " +
       s"on broker $localBrokerId")
   }
 
@@ -760,7 +757,7 @@ class Partition(val topicPartition: TopicPartition,
         } else
           (false, Errors.NONE)
       case None =>
-        (false, Errors.NOT_LEADER_FOR_PARTITION)
+        (false, Errors.NOT_LEADER_OR_FOLLOWER)
     }
   }
 
@@ -822,7 +819,7 @@ class Partition(val topicPartition: TopicPartition,
    */
   def lowWatermarkIfLeader: Long = {
     if (!isLeader)
-      throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
+      throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
 
     // lowWatermarkIfLeader may be called many times when a DeleteRecordsRequest is outstanding,
     // care has been taken to avoid generating unnecessary collections in this code
@@ -987,7 +984,7 @@ class Partition(val topicPartition: TopicPartition,
           (info, maybeIncrementLeaderHW(leaderLog))
 
         case None =>
-          throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+          throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
             .format(topicPartition, localBrokerId))
       }
     }
@@ -1130,7 +1127,7 @@ class Partition(val topicPartition: TopicPartition,
           requestedOffset = convertedOffset,
           lowWatermark = lowWatermarkIfLeader)
       case None =>
-        throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
+        throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
     }
   }
 
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 4898e22..dea0b77 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -285,7 +285,7 @@ class GroupMetadataManager(brokerId: Int,
                    | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                 Errors.COORDINATOR_NOT_AVAILABLE
 
-              case Errors.NOT_LEADER_FOR_PARTITION
+              case Errors.NOT_LEADER_OR_FOLLOWER
                    | Errors.KAFKA_STORAGE_ERROR =>
                 Errors.NOT_COORDINATOR
 
@@ -432,7 +432,7 @@ class GroupMetadataManager(brokerId: Int,
                        | Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
                     Errors.COORDINATOR_NOT_AVAILABLE
 
-                  case Errors.NOT_LEADER_FOR_PARTITION
+                  case Errors.NOT_LEADER_OR_FOLLOWER
                        | Errors.KAFKA_STORAGE_ERROR =>
                     Errors.NOT_COORDINATOR
 
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 66edc47..c2efdbd 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -143,7 +143,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
                       throw new IllegalStateException(s"Received fatal error ${error.exceptionName} while sending txn marker for $transactionalId")
 
                     case Errors.UNKNOWN_TOPIC_OR_PARTITION |
-                         Errors.NOT_LEADER_FOR_PARTITION |
+                         Errors.NOT_LEADER_OR_FOLLOWER |
                          Errors.NOT_ENOUGH_REPLICAS |
                          Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
                          Errors.REQUEST_TIMED_OUT |
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 8eb65df..923aed9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -513,7 +513,7 @@ class TransactionStateManager(brokerId: Int,
                | Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry
             Errors.COORDINATOR_NOT_AVAILABLE
 
-          case Errors.NOT_LEADER_FOR_PARTITION
+          case Errors.NOT_LEADER_OR_FOLLOWER
                | Errors.KAFKA_STORAGE_ERROR =>
             Errors.NOT_COORDINATOR
 
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f80d6fa..a6322ea 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -377,7 +377,7 @@ abstract class AbstractFetcherThread(name: String,
                 case Errors.FENCED_LEADER_EPOCH =>
                   if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
 
-                case Errors.NOT_LEADER_FOR_PARTITION =>
+                case Errors.NOT_LEADER_OR_FOLLOWER =>
                   debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
                     "that the partition is being moved")
                   partitionsWithError += topicPartition
@@ -556,7 +556,7 @@ abstract class AbstractFetcherThread(name: String,
 
       case e @ (_ : UnknownTopicOrPartitionException |
                 _ : UnknownLeaderEpochException |
-                _ : NotLeaderForPartitionException) =>
+                _ : NotLeaderOrFollowerException) =>
         info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
         true
 
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 5275275..e665688 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -80,7 +80,7 @@ class DelayedDeleteRecords(delayMs: Long,
                 val leaderLW = partition.lowWatermarkIfLeader
                 (leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
               case None =>
-                (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+                (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
             }
 
           case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 50cd4ee..3aa2ee4 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -87,8 +87,7 @@ class DelayedFetch(delayMs: Long,
         val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
         try {
           if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
-            val partition = replicaManager.getPartitionOrException(topicPartition,
-              expectLeader = fetchMetadata.fetchOnlyLeader)
+            val partition = replicaManager.getPartitionOrException(topicPartition)
             val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
 
             val endOffset = fetchMetadata.fetchIsolation match {
@@ -121,11 +120,8 @@ class DelayedFetch(delayMs: Long,
             }
           }
         } catch {
-          case _: NotLeaderForPartitionException =>  // Case A
-            debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
-            return forceComplete()
-          case _: ReplicaNotAvailableException =>  // Case B
-            debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
+          case _: NotLeaderOrFollowerException =>  // Case A or Case B
+            debug(s"Broker is no longer the leader or follower of $topicPartition, satisfy $fetchMetadata immediately")
             return forceComplete()
           case _: UnknownTopicOrPartitionException => // Case C
             debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 83e6142..668851d 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -85,7 +85,7 @@ class DelayedProduce(delayMs: Long,
       trace(s"Checking produce satisfaction for $topicPartition, current status $status")
       // skip those partitions that have already been satisfied
       if (status.acksPending) {
-        val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
+        val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match {
           case Left(err) =>
             // Case A
             (false, err)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 86649c1..51c1701 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -923,10 +923,10 @@ class KafkaApis(val requestChannel: RequestChannel,
           fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
         (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava))
       } catch {
-        // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
+        // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages
         // are typically transient and there is no value in logging the entire stack trace for the same
         case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
+                  _ : NotLeaderOrFollowerException |
                   _ : KafkaStorageException) =>
           debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
             correlationId, clientId, topicPartition, e.getMessage))
@@ -998,7 +998,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           // NOTE: These exceptions are special cased since these error messages are typically transient or the client
           // would have received a clear exception and there is no value in logging the entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderForPartitionException |
+                    _ : NotLeaderOrFollowerException |
                     _ : UnknownLeaderEpochException |
                     _ : FencedLeaderEpochException |
                     _ : KafkaStorageException |
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 5db3464..91b873f 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -146,12 +146,12 @@ class ReplicaAlterLogDirsThread(name: String,
   }
 
   override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
-    val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+    val partition = replicaMgr.getPartitionOrException(topicPartition)
     partition.localLogOrException.logStartOffset
   }
 
   override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
-    val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+    val partition = replicaMgr.getPartitionOrException(topicPartition)
     partition.localLogOrException.logEndOffset
   }
 
@@ -166,7 +166,7 @@ class ReplicaAlterLogDirsThread(name: String,
         val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) {
           new EpochEndOffset(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
         } else {
-          val partition = replicaMgr.getPartitionOrException(tp, expectLeader = false)
+          val partition = replicaMgr.getPartitionOrException(tp)
           partition.lastOffsetForLeaderEpoch(
             currentLeaderEpoch = epochData.currentLeaderEpoch,
             leaderEpoch = epochData.leaderEpoch,
@@ -198,12 +198,12 @@ class ReplicaAlterLogDirsThread(name: String,
    * exchange with the current replica to truncate to the largest common log prefix for the topic partition
    */
   override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
-    val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+    val partition = replicaMgr.getPartitionOrException(topicPartition)
     partition.truncateTo(truncationState.offset, isFuture = true)
   }
 
   override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
-    val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+    val partition = replicaMgr.getPartitionOrException(topicPartition)
     partition.truncateFullyAndStartAt(offset, isFuture = true)
   }
 
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 04d88b9..8f05d65 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -500,8 +500,8 @@ class ReplicaManager(val config: KafkaConfig,
     allPartitions.values.iterator.count(_ == HostedPartition.Offline)
   }
 
-  def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
-    getPartitionOrError(topicPartition, expectLeader) match {
+  def getPartitionOrException(topicPartition: TopicPartition): Partition = {
+    getPartitionOrError(topicPartition) match {
       case Left(Errors.KAFKA_STORAGE_ERROR) =>
         throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
 
@@ -512,7 +512,7 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getPartitionOrError(topicPartition: TopicPartition, expectLeader: Boolean): Either[Errors, Partition] = {
+  def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
     getPartition(topicPartition) match {
       case HostedPartition.Online(partition) =>
         Right(partition)
@@ -521,15 +521,11 @@ class ReplicaManager(val config: KafkaConfig,
         Left(Errors.KAFKA_STORAGE_ERROR)
 
       case HostedPartition.None if metadataCache.contains(topicPartition) =>
-        if (expectLeader) {
-          // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
-          // forces clients to refresh metadata to find the new location. This can happen, for example,
-          // during a partition reassignment if a produce request from the client is sent to a broker after
-          // the local replica has been deleted.
-          Left(Errors.NOT_LEADER_FOR_PARTITION)
-        } else {
-          Left(Errors.REPLICA_NOT_AVAILABLE)
-        }
+        // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
+        // forces clients to refresh metadata to find the new location. This can happen, for example,
+        // during a partition reassignment if a produce request from the client is sent to a broker after
+        // the local replica has been deleted.
+        Left(Errors.NOT_LEADER_OR_FOLLOWER)
 
       case HostedPartition.None =>
         Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -537,15 +533,15 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   def localLogOrException(topicPartition: TopicPartition): Log = {
-    getPartitionOrException(topicPartition, expectLeader = false).localLogOrException
+    getPartitionOrException(topicPartition).localLogOrException
   }
 
   def futureLocalLogOrException(topicPartition: TopicPartition): Log = {
-    getPartitionOrException(topicPartition, expectLeader = false).futureLocalLogOrException
+    getPartitionOrException(topicPartition).futureLocalLogOrException
   }
 
   def futureLogExists(topicPartition: TopicPartition): Boolean = {
-    getPartitionOrException(topicPartition, expectLeader = false).futureLog.isDefined
+    getPartitionOrException(topicPartition).futureLog.isDefined
   }
 
   def localLog(topicPartition: TopicPartition): Option[Log] = {
@@ -626,12 +622,12 @@ class ReplicaManager(val config: KafkaConfig,
         (topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
       } else {
         try {
-          val partition = getPartitionOrException(topicPartition, expectLeader = true)
+          val partition = getPartitionOrException(topicPartition)
           val logDeleteResult = partition.deleteRecordsOnLeader(requestedOffset)
           (topicPartition, logDeleteResult)
         } catch {
           case e@ (_: UnknownTopicOrPartitionException |
-                   _: NotLeaderForPartitionException |
+                   _: NotLeaderOrFollowerException |
                    _: OffsetOutOfRangeException |
                    _: PolicyViolationException |
                    _: KafkaStorageException) =>
@@ -688,11 +684,11 @@ class ReplicaManager(val config: KafkaConfig,
           // If the log for this partition has not been created yet:
           // 1) Record the destination log directory in the memory so that the partition will be created in this log directory
           //    when broker receives LeaderAndIsrRequest for this partition later.
-          // 2) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
+          // 2) Respond with NotLeaderOrFollowerException for this partition in the AlterReplicaLogDirsResponse
           logManager.maybeUpdatePreferredLogDir(topicPartition, destinationDir)
 
-          // throw ReplicaNotAvailableException if replica does not exist for the given partition
-          val partition = getPartitionOrException(topicPartition, expectLeader = false)
+          // throw NotLeaderOrFollowerException if replica does not exist for the given partition
+          val partition = getPartitionOrException(topicPartition)
           partition.localLogOrException
 
           // If the destinationLDir is different from the current log directory of the replica:
@@ -716,8 +712,12 @@ class ReplicaManager(val config: KafkaConfig,
                   _: LogDirNotFoundException |
                   _: ReplicaNotAvailableException |
                   _: KafkaStorageException) =>
-            warn("Unable to alter log dirs for %s".format(topicPartition), e)
+            warn(s"Unable to alter log dirs for $topicPartition", e)
             (topicPartition, Errors.forException(e))
+          case e: NotLeaderOrFollowerException =>
+            // Retaining REPLICA_NOT_AVAILABLE exception for ALTER_REPLICA_LOG_DIRS for compatibility
+            warn(s"Unable to alter log dirs for $topicPartition", e)
+            (topicPartition, Errors.REPLICA_NOT_AVAILABLE)
           case t: Throwable =>
             error("Error while changing replica dir for partition %s".format(topicPartition), t)
             (topicPartition, Errors.forException(t))
@@ -879,7 +879,7 @@ class ReplicaManager(val config: KafkaConfig,
           Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
       } else {
         try {
-          val partition = getPartitionOrException(topicPartition, expectLeader = true)
+          val partition = getPartitionOrException(topicPartition)
           val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
           val numAppendedMessages = info.numMessages
 
@@ -898,7 +898,7 @@ class ReplicaManager(val config: KafkaConfig,
           // NOTE: Failed produce requests metric is not incremented for known exceptions
           // it is supposed to indicate un-expected failures of a broker in handling a produce request
           case e@ (_: UnknownTopicOrPartitionException |
-                   _: NotLeaderForPartitionException |
+                   _: NotLeaderOrFollowerException |
                    _: RecordTooLargeException |
                    _: RecordBatchTooLargeException |
                    _: CorruptRecordException |
@@ -922,7 +922,7 @@ class ReplicaManager(val config: KafkaConfig,
                               isolationLevel: Option[IsolationLevel],
                               currentLeaderEpoch: Optional[Integer],
                               fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = {
-    val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
+    val partition = getPartitionOrException(topicPartition)
     partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader)
   }
 
@@ -931,7 +931,7 @@ class ReplicaManager(val config: KafkaConfig,
                                      maxNumOffsets: Int,
                                      isFromConsumer: Boolean,
                                      fetchOnlyFromLeader: Boolean): Seq[Long] = {
-    val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
+    val partition = getPartitionOrException(topicPartition)
     partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
   }
 
@@ -1050,7 +1050,7 @@ class ReplicaManager(val config: KafkaConfig,
             s"remaining response limit $limitBytes" +
             (if (minOneMessage) s", ignoring response/partition size limits" else ""))
 
-        val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
+        val partition = getPartitionOrException(tp)
         val fetchTimeMs = time.milliseconds
 
         // If we are the leader, determine the preferred read-replica
@@ -1108,7 +1108,7 @@ class ReplicaManager(val config: KafkaConfig,
         // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
         // is supposed to indicate un-expected failure of a broker in handling a fetch request
         case e@ (_: UnknownTopicOrPartitionException |
-                 _: NotLeaderForPartitionException |
+                 _: NotLeaderOrFollowerException |
                  _: UnknownLeaderEpochException |
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
@@ -1775,7 +1775,7 @@ class ReplicaManager(val config: KafkaConfig,
           new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
         case HostedPartition.None if metadataCache.contains(tp) =>
-          new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
+          new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
 
         case HostedPartition.None =>
           new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bab4452..778ed7b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -698,7 +698,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
         lowWatermark.contains(5L)
       } catch {
         case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] ||
-          e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
+          e.getCause.isInstanceOf[NotLeaderOrFollowerException] => false
         }
     }, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
   }
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index beb5232..fb4d839 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -161,7 +161,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
    *
    * TODO: other exceptions that can be thrown in ExecutionException:
    *    UnknownTopicOrPartitionException
-   *    NotLeaderForPartitionException
+   *    NotLeaderOrFollowerException
    *    LeaderNotAvailableException
    *    CorruptRecordException
    *    TimeoutException
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index c43d481..38831b5 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -21,7 +21,7 @@ import java.util.Optional
 import scala.collection.Seq
 import kafka.cluster.Partition
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.FetchRequest
@@ -62,7 +62,7 @@ class DelayedFetchTest extends EasyMockSupport {
 
     val partition: Partition = mock(classOf[Partition])
 
-    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
+    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
         .andReturn(partition)
     EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
         .andThrow(new FencedLeaderEpochException("Requested epoch has been fenced"))
@@ -81,7 +81,7 @@ class DelayedFetchTest extends EasyMockSupport {
   }
 
   @Test
-  def testReplicaNotAvailable(): Unit = {
+  def testNotLeaderOrFollower(): Unit = {
     val topicPartition = new TopicPartition("topic", 0)
     val fetchOffset = 500L
     val logStartOffset = 0L
@@ -106,9 +106,9 @@ class DelayedFetchTest extends EasyMockSupport {
       clientMetadata = None,
       responseCallback = callback)
 
-    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
-      .andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
-    expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
+    EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
+      .andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available"))
+    expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
     EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
 
     replayAll()
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 725117f..dc73b4d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -30,7 +30,7 @@ import kafka.server._
 import kafka.server.checkpoints.OffsetCheckpoints
 import kafka.utils._
 import org.apache.kafka.common.{IsolationLevel, TopicPartition}
-import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
 import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -271,8 +271,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
     assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
 
-    assertSnapshotError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
-    assertSnapshotError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
+    assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+    assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
     assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
     assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
@@ -312,8 +312,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
     assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
 
-    assertLastOffsetForLeaderError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
-    assertLastOffsetForLeaderError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+    assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+    assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
     assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
     assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
@@ -372,8 +372,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
     assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
 
-    assertReadRecordsError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
-    assertReadRecordsError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+    assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+    assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
     assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
     assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
@@ -430,8 +430,8 @@ class PartitionTest extends AbstractPartitionTest {
     assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
     assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
 
-    assertFetchOffsetError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
-    assertFetchOffsetError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+    assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+    assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
     assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
     assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
   }
@@ -783,14 +783,14 @@ class PartitionTest extends AbstractPartitionTest {
   @Test
   def testGetReplica(): Unit = {
     assertEquals(None, partition.log)
-    assertThrows[ReplicaNotAvailableException] {
+    assertThrows[NotLeaderOrFollowerException] {
       partition.localLogOrException
     }
   }
 
   @Test
   def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
-    assertThrows[ReplicaNotAvailableException] {
+    assertThrows[NotLeaderOrFollowerException] {
       partition.appendRecordsToFollowerOrFutureReplica(
            createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
     }
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6266d7f..08a868c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -994,7 +994,7 @@ class GroupMetadataManagerTest {
     assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
     assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
     assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
+    assertStoreGroupErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
     assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
     assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
     assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR)
@@ -1272,7 +1272,7 @@ class GroupMetadataManagerTest {
     assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
     assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
     assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
-    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
+    assertCommitOffsetErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
     assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
     assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 5ae961b..07b2fa8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -177,8 +177,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
   }
 
   @Test
-  def shouldRetryPartitionWhenNotLeaderForPartitionError(): Unit = {
-    verifyRetriesPartitionOnError(Errors.NOT_LEADER_FOR_PARTITION)
+  def shouldRetryPartitionWhenNotLeaderOrFollowerError(): Unit = {
+    verifyRetriesPartitionOnError(Errors.NOT_LEADER_OR_FOLLOWER)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 3055742..2a8d046 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -366,7 +366,7 @@ class TransactionStateManagerTest {
     expectedError = Errors.NOT_COORDINATOR
     var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
 
-    prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
+    prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
     transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
     assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
     assertTrue(txnMetadata1.pendingState.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index f0c4329..6561a48 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -209,7 +209,7 @@ class FetchRequestTest extends BaseRequestTest {
       Seq(topicPartition))).build()
     val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
     val partitionData = fetchResponse.responseData.get(topicPartition)
-    assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9da7319..55d2b77 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -770,8 +770,8 @@ class KafkaApisTest {
   }
 
   @Test
-  def testLeaderReplicaIfLocalRaisesNotLeaderForPartition(): Unit = {
-    testListOffsetFailedGetLeaderReplica(Errors.NOT_LEADER_FOR_PARTITION)
+  def testLeaderReplicaIfLocalRaisesNotLeaderOrFollower(): Unit = {
+    testListOffsetFailedGetLeaderReplica(Errors.NOT_LEADER_OR_FOLLOWER)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 920176e..2600cce 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -64,14 +64,14 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
 
     // Follower
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, consumerRequest)
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, replicaRequest)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, consumerRequest)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, replicaRequest)
     assertResponseError(Errors.NONE, follower, debugReplicaRequest)
 
     // Non-replica
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, consumerRequest)
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, replicaRequest)
-    assertResponseError(Errors.REPLICA_NOT_AVAILABLE, nonReplica, debugReplicaRequest)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, consumerRequest)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, replicaRequest)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
   }
 
   @Test
@@ -105,8 +105,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
     // Check follower error codes
     val followerId = TestUtils.findFollowerId(topicPartition, servers)
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
   }
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 8ac65b4..9a985af 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -27,7 +27,7 @@ import kafka.utils.{CoreUtils, Exit, TestUtils}
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
 import org.apache.kafka.common.utils.Utils
 import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
 import org.junit.{Before, Test}
@@ -150,16 +150,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
 
     causeLogDirFailure(failureType, leaderServer, partition)
 
-    // send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
+    // send() should fail due to either KafkaStorageException or NotLeaderOrFollowerException
     try {
       producer.send(record).get(6000, TimeUnit.MILLISECONDS)
-      fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
+      fail("send() should fail with either KafkaStorageException or NotLeaderOrFollowerException")
     } catch {
       case e: ExecutionException =>
         e.getCause match {
           case t: KafkaStorageException =>
-          case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3
-          case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}")
+          case t: NotLeaderOrFollowerException => // This may happen if ProduceRequest version <= 3
+          case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}")
         }
     }
   }
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 8fc9cdb..7c008fc 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -48,8 +48,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
     val follower = replicas.find(_ != leader).get
     val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
 
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, request)
-    assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, request)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, request)
+    assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, request)
   }
 
   @Test
@@ -81,8 +81,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
 
     // Check follower error codes
     val followerId = TestUtils.findFollowerId(topicPartition, servers)
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
-    assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
+    assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
   }
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index d3a871a..4368442 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -125,7 +125,7 @@ class ProduceRequestTest extends BaseRequestTest {
 
     val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
     assertEquals(1, produceResponse.responses.size)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResponse.responses.asScala.head._2.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResponse.responses.asScala.head._2.error)
   }
 
   /* returns a pair of partition id and leader id */
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index c2f9033..4713138 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -92,7 +92,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
     when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
     when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
-    when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
 
     when(quotaManager.isQuotaExceeded).thenReturn(false)
 
@@ -181,7 +181,7 @@ class ReplicaAlterLogDirsThreadTest {
     when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
     when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
     when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
-    when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+    when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
 
     when(quotaManager.isQuotaExceeded).thenReturn(false)
 
@@ -266,13 +266,13 @@ class ReplicaAlterLogDirsThreadTest {
     val leoT1p1 = 232
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partitionT1p0)
     expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p0, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpochT1p0, leoT1p0))
       .anyTimes()
 
-    expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p1))
       .andStubReturn(partitionT1p1)
     expect(partitionT1p1.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p1, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpochT1p1, leoT1p1))
@@ -315,13 +315,13 @@ class ReplicaAlterLogDirsThreadTest {
     val leo = 13
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partitionT1p0)
     expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
       .andReturn(new EpochEndOffset(leaderEpoch, leo))
       .anyTimes()
 
-    expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p1))
       .andThrow(new KafkaStorageException).once()
 
     replay(partitionT1p0, replicaManager)
@@ -375,9 +375,9 @@ class ReplicaAlterLogDirsThreadTest {
     val replicaT1p1LEO = 192
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partitionT1p0)
-    expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p1))
       .andStubReturn(partitionT1p1)
     expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLogT1p0)
     expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
@@ -452,7 +452,7 @@ class ReplicaAlterLogDirsThreadTest {
     val futureReplicaEpochEndOffset = 191
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partition)
     expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
     expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
@@ -522,7 +522,7 @@ class ReplicaAlterLogDirsThreadTest {
     val initialFetchOffset = 100
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partition)
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).anyTimes()
     expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
@@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest {
     val replicaLEO = 300
 
     //Stubs
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
       .andStubReturn(partition)
     expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).once()
 
@@ -655,7 +655,7 @@ class ReplicaAlterLogDirsThreadTest {
     val futureReplicaLEO = 190
     val replicaLEO = 213
 
-    expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+    expect(replicaManager.getPartitionOrException(t1p0))
         .andStubReturn(partition)
     expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
         .andReturn(new EpochEndOffset(leaderEpoch, replicaLEO))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 1ab2054..cb6b68f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -600,7 +600,7 @@ class ReplicaFetcherThreadTest {
 
     //Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
     val offsetsReply = mutable.Map(
-      t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
+      t1p0 -> new EpochEndOffset(NOT_LEADER_OR_FOLLOWER, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
       t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
     ).asJava
 
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 771bc7d..fe4b2ca 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -165,8 +165,7 @@ class ReplicaManagerQuotasTest {
           .andReturn(offsetSnapshot)
 
       val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
-      EasyMock.expect(replicaManager.getPartitionOrException(
-        EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
+      EasyMock.expect(replicaManager.getPartitionOrException(EasyMock.anyObject[TopicPartition]))
         .andReturn(partition).anyTimes()
 
       EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[Partition], EasyMock.anyObject[Int]))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 09e4f14..43f3a9a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -23,10 +23,9 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.{Optional, Properties}
 
-import kafka.api.LeaderAndIsr
-import kafka.api.Request
+import kafka.api._
 import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
-import kafka.cluster.BrokerEndPoint
+import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.LazyOffsetCheckpoints
 import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
@@ -184,12 +183,12 @@ class ReplicaManagerTest {
           .setIsNew(false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
-      rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      rm.getPartitionOrException(new TopicPartition(topic, 0))
           .localLogOrException
 
       val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
       val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
-        assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
+        assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
       }
 
       // Make this replica the follower
@@ -243,7 +242,7 @@ class ReplicaManagerTest {
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
 
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
-      val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
       assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size)
 
       val previousReplicaFolder = partition.log.get.dir.getParentFile
@@ -301,7 +300,7 @@ class ReplicaManagerTest {
           .setIsNew(true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
-      replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
       val producerId = 234L
@@ -361,7 +360,7 @@ class ReplicaManagerTest {
           .setIsNew(true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
-      replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
       val producerId = 234L
@@ -467,7 +466,7 @@ class ReplicaManagerTest {
           .setIsNew(true)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
       replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
-      replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
       val producerId = 234L
@@ -543,7 +542,7 @@ class ReplicaManagerTest {
           .setIsNew(false)).asJava,
         Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
       rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
-      rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+      rm.getPartitionOrException(new TopicPartition(topic, 0))
         .localLogOrException
 
       // Append a couple of messages.
@@ -1082,7 +1081,7 @@ class ReplicaManagerTest {
       Optional.of(0))
     fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
     assertNotNull(fetchResult.get)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
   }
 
   @Test
@@ -1175,7 +1174,7 @@ class ReplicaManagerTest {
     replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
 
     assertNotNull(fetchResult.get)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
   }
 
   @Test
@@ -1304,7 +1303,7 @@ class ReplicaManagerTest {
 
     replicaManager.stopReplica(tp0, deletePartition = true)
     assertNotNull(fetchResult.get)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
   }
 
   @Test
@@ -1338,7 +1337,7 @@ class ReplicaManagerTest {
 
     replicaManager.stopReplica(tp0, deletePartition = true)
     assertNotNull(produceResult.get)
-    assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResult.get.error)
+    assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error)
   }
 
   private def sendProducerAppend(replicaManager: ReplicaManager,
@@ -2086,4 +2085,31 @@ class ReplicaManagerTest {
       assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
     }
   }
+
+  @Test
+  def testReplicaNotAvailable(): Unit = {
+
+    def createReplicaManager(): ReplicaManager = {
+      val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+      val config = KafkaConfig.fromProps(props)
+      val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
+      new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
+        new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+        new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) {
+        override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
+          throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
+        }
+      }
+    }
+
+    val replicaManager = createReplicaManager()
+    try {
+      val tp = new TopicPartition(topic, 0)
+      val dir = replicaManager.logManager.liveLogDirs.head.getAbsolutePath
+      val errors = replicaManager.alterReplicaLogDirs(Map(tp -> dir))
+      assertEquals(Errors.REPLICA_NOT_AVAILABLE, errors(tp))
+    } finally {
+      replicaManager.shutdown(false)
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 8b35149..e349f32 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -128,7 +128,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
 
     //And should get no leader for partition error from t1p1 (as it's not on broker 0)
     assertTrue(offsetsForEpochs(t1p1).hasError)
-    assertEquals(NOT_LEADER_FOR_PARTITION, offsetsForEpochs(t1p1).error)
+    assertEquals(NOT_LEADER_OR_FOLLOWER, offsetsForEpochs(t1p1).error)
     assertEquals(UNDEFINED_EPOCH_OFFSET, offsetsForEpochs(t1p1).endOffset)
 
     //Repointing to broker 1 we should get the correct offset for t1p1
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 08be8a2..fd4e8d9 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -87,7 +87,7 @@ class OffsetsForLeaderEpochTest {
     val response = replicaManager.lastOffsetForLeaderEpoch(request)
 
     //Then
-    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
+    assertEquals(new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
   }
 
   @Test
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 2983cf7..676bc69 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -35,6 +35,10 @@
         <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-602%3A+Change+default+value+for+client.dns.lookup">KIP-602</a>
         for more details.
     </li>
+    <li><code>NotLeaderForPartitionException</code> has been deprecated and replaced with <code>NotLeaderOrFollowerException</code>.
+        Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9)
+        if the broker is not a replica, ensuring that this transient error during reassignments is handled by all clients as a retriable exception.
+    </li>
 </ul>
 
 <h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>