You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2020/07/17 19:59:12 UTC
[kafka] branch 2.6 updated: KAFKA-10223;
Use NOT_LEADER_OR_FOLLOWER instead of non-retriable
REPLICA_NOT_AVAILABLE for consumers (#8979)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new f221a2e KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (#8979)
f221a2e is described below
commit f221a2e59ebb6c0deccb35cbcf92576954803231
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Fri Jul 17 20:05:11 2020 +0100
KAFKA-10223; Use NOT_LEADER_OR_FOLLOWER instead of non-retriable REPLICA_NOT_AVAILABLE for consumers (#8979)
Brokers currently return NOT_LEADER_FOR_PARTITION to producers and REPLICA_NOT_AVAILABLE to consumers if a replica is not available on the broker during reassignments. Non-Java clients treat REPLICA_NOT_AVAILABLE as a non-retriable exception, Java consumers handle this error by explicitly matching the error code even though it is not an InvalidMetadataException. This PR renames NOT_LEADER_FOR_PARTITION to NOT_LEADER_OR_FOLLOWER and uses the same error for producers and consumers. This [...]
- ALTER_REPLICA_LOG_DIRS continues to return REPLICA_NOT_AVAILABLE. Retained this for compatibility since this request never returned NOT_LEADER_FOR_PARTITION earlier.
- MetadataRequest version 0 also returns REPLICA_NOT_AVAILABLE as topic-level error code for compatibility. Newer versions filter these out and return Errors.NONE, so didn't change this.
- Partition responses in MetadataRequest return REPLICA_NOT_AVAILABLE to indicate that one of the replicas is not available. Did not change this since NOT_LEADER_FOR_PARTITION is not suitable in this case.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>, Bob Barrett <bo...@confluent.io>
---
.../kafka/clients/consumer/internals/Fetcher.java | 4 +-
.../internals/OffsetsForLeaderEpochClient.java | 2 +-
.../errors/NotLeaderForPartitionException.java | 4 +-
...tion.java => NotLeaderOrFollowerException.java} | 18 ++++---
.../errors/ReplicaNotAvailableException.java | 8 +++-
.../org/apache/kafka/common/protocol/Errors.java | 10 ++--
.../common/requests/DeleteRecordsResponse.java | 2 +-
.../apache/kafka/common/requests/FetchRequest.java | 2 +-
.../kafka/common/requests/FetchResponse.java | 10 ++--
.../kafka/common/requests/ListOffsetResponse.java | 4 +-
.../requests/OffsetsForLeaderEpochResponse.java | 4 +-
.../kafka/common/requests/ProduceRequest.java | 2 +-
.../kafka/common/requests/ProduceResponse.java | 8 ++--
.../common/requests/WriteTxnMarkersResponse.java | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 12 ++---
.../kafka/clients/consumer/KafkaConsumerTest.java | 2 +-
.../clients/consumer/internals/FetcherTest.java | 20 ++++----
.../clients/producer/internals/SenderTest.java | 20 ++++----
.../producer/internals/TransactionManagerTest.java | 12 ++---
.../common/requests/LeaderAndIsrResponseTest.java | 2 +-
.../common/requests/StopReplicaResponseTest.java | 2 +-
core/src/main/scala/kafka/cluster/Partition.scala | 21 ++++----
.../coordinator/group/GroupMetadataManager.scala | 4 +-
...TransactionMarkerRequestCompletionHandler.scala | 2 +-
.../transaction/TransactionStateManager.scala | 2 +-
.../scala/kafka/server/AbstractFetcherThread.scala | 4 +-
.../scala/kafka/server/DelayedDeleteRecords.scala | 2 +-
.../src/main/scala/kafka/server/DelayedFetch.scala | 10 ++--
.../main/scala/kafka/server/DelayedProduce.scala | 2 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 6 +--
.../kafka/server/ReplicaAlterLogDirsThread.scala | 10 ++--
.../main/scala/kafka/server/ReplicaManager.scala | 56 +++++++++++-----------
.../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +-
.../kafka/api/ProducerFailureHandlingTest.scala | 2 +-
.../kafka/server/DelayedFetchTest.scala | 12 ++---
.../scala/unit/kafka/cluster/PartitionTest.scala | 22 ++++-----
.../group/GroupMetadataManagerTest.scala | 4 +-
...sactionMarkerRequestCompletionHandlerTest.scala | 4 +-
.../transaction/TransactionStateManagerTest.scala | 2 +-
.../scala/unit/kafka/server/FetchRequestTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 4 +-
.../unit/kafka/server/ListOffsetsRequestTest.scala | 14 +++---
.../unit/kafka/server/LogDirFailureTest.scala | 10 ++--
.../server/OffsetsForLeaderEpochRequestTest.scala | 8 ++--
.../unit/kafka/server/ProduceRequestTest.scala | 2 +-
.../server/ReplicaAlterLogDirsThreadTest.scala | 24 +++++-----
.../kafka/server/ReplicaFetcherThreadTest.scala | 2 +-
.../kafka/server/ReplicaManagerQuotasTest.scala | 3 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 54 +++++++++++++++------
.../server/epoch/LeaderEpochIntegrationTest.scala | 2 +-
.../server/epoch/OffsetsForLeaderEpochTest.scala | 2 +-
docs/upgrade.html | 4 ++
52 files changed, 243 insertions(+), 205 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 52c5155..3251f4f 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1043,7 +1043,7 @@ public class Fetcher<K, V> implements Closeable {
log.debug("Cannot search by timestamp for partition {} because the message format version " +
"is before 0.10.0", topicPartition);
break;
- case NOT_LEADER_FOR_PARTITION:
+ case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
@@ -1274,7 +1274,7 @@ public class Fetcher<K, V> implements Closeable {
}
nextCompletedFetch.initialized = true;
- } else if (error == Errors.NOT_LEADER_FOR_PARTITION ||
+ } else if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
error == Errors.REPLICA_NOT_AVAILABLE ||
error == Errors.KAFKA_STORAGE_ERROR ||
error == Errors.FENCED_LEADER_EPOCH ||
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
index b05e01f..7b1f44a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/OffsetsForLeaderEpochClient.java
@@ -80,7 +80,7 @@ public class OffsetsForLeaderEpochClient extends AsyncClient<
topicPartition, epochEndOffset.endOffset(), epochEndOffset.leaderEpoch());
endOffsets.put(topicPartition, epochEndOffset);
break;
- case NOT_LEADER_FOR_PARTITION:
+ case NOT_LEADER_OR_FOLLOWER:
case REPLICA_NOT_AVAILABLE:
case KAFKA_STORAGE_ERROR:
case OFFSET_NOT_AVAILABLE:
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
index 7182779..6ecce02 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
@@ -17,8 +17,10 @@
package org.apache.kafka.common.errors;
/**
- * This server is not the leader for the given partition
+ * This server is not the leader for the given partition.
+ * @deprecated since 2.7. Use {@link NotLeaderOrFollowerException}.
*/
+@Deprecated
public class NotLeaderForPartitionException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
similarity index 53%
copy from clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
copy to clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
index 7182779..2db960b 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderForPartitionException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/NotLeaderOrFollowerException.java
@@ -17,25 +17,31 @@
package org.apache.kafka.common.errors;
/**
- * This server is not the leader for the given partition
+ * Broker returns this error if a request could not be processed because the broker is not the leader
+ * or follower for a topic partition. This could be a transient exception during leader elections and
+ * reassignments. For `Produce` and other requests which are intended only for the leader, this exception
+ * indicates that the broker is not the current leader. For consumer `Fetch` requests which may be
+ * satisfied by a leader or follower, this exception indicates that the broker is not a replica
+ * of the topic partition.
*/
-public class NotLeaderForPartitionException extends InvalidMetadataException {
+@SuppressWarnings("deprecation")
+public class NotLeaderOrFollowerException extends NotLeaderForPartitionException {
private static final long serialVersionUID = 1L;
- public NotLeaderForPartitionException() {
+ public NotLeaderOrFollowerException() {
super();
}
- public NotLeaderForPartitionException(String message) {
+ public NotLeaderOrFollowerException(String message) {
super(message);
}
- public NotLeaderForPartitionException(Throwable cause) {
+ public NotLeaderOrFollowerException(Throwable cause) {
super(cause);
}
- public NotLeaderForPartitionException(String message, Throwable cause) {
+ public NotLeaderOrFollowerException(String message, Throwable cause) {
super(message, cause);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
index b94d400..07971cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/ReplicaNotAvailableException.java
@@ -16,7 +16,13 @@
*/
package org.apache.kafka.common.errors;
-public class ReplicaNotAvailableException extends ApiException {
+/**
+ * The replica is not available for the requested topic partition. This may be
+ * a transient exception during reassignments. From version 2.6 onwards, Fetch requests
+ * and other requests intended only for the leader or follower of the topic partition return
+ * {@link NotLeaderOrFollowerException} if the broker is a not a replica of the partition.
+ */
+public class ReplicaNotAvailableException extends InvalidMetadataException {
private static final long serialVersionUID = 1L;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 526b4df..2b091f3 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -72,7 +72,7 @@ import org.apache.kafka.common.errors.NotControllerException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasAfterAppendException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.OffsetNotAvailableException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
@@ -139,13 +139,15 @@ public enum Errors {
InvalidFetchSizeException::new),
LEADER_NOT_AVAILABLE(5, "There is no leader for this topic-partition as we are in the middle of a leadership election.",
LeaderNotAvailableException::new),
- NOT_LEADER_FOR_PARTITION(6, "This server is not the leader for that topic-partition.",
- NotLeaderForPartitionException::new),
+ NOT_LEADER_OR_FOLLOWER(6, "For requests intended only for the leader, this error indicates that the broker is not the current leader. " +
+ "For requests intended for any replica, this error indicates that the broker is not a replica of the topic partition.",
+ NotLeaderOrFollowerException::new),
REQUEST_TIMED_OUT(7, "The request timed out.",
TimeoutException::new),
BROKER_NOT_AVAILABLE(8, "The broker is not available.",
BrokerNotAvailableException::new),
- REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition.",
+ REPLICA_NOT_AVAILABLE(9, "The replica is not available for the requested topic-partition. Produce/Fetch requests and other requests " +
+ "intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER if the broker is not a replica of the topic-partition.",
ReplicaNotAvailableException::new),
MESSAGE_TOO_LARGE(10, "The request included a message larger than the max message size the server will accept.",
RecordTooLargeException::new),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
index 3266bf8..968a35b 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteRecordsResponse.java
@@ -36,7 +36,7 @@ public class DeleteRecordsResponse extends AbstractResponse {
*
* OFFSET_OUT_OF_RANGE (1)
* UNKNOWN_TOPIC_OR_PARTITION (3)
- * NOT_LEADER_FOR_PARTITION (6)
+ * NOT_LEADER_OR_FOLLOWER (6)
* REQUEST_TIMED_OUT (7)
* UNKNOWN (-1)
*/
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
index 502e32b..29cbb60 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java
@@ -146,7 +146,7 @@ public class FetchRequest extends AbstractRequest {
TOPICS_V5);
// V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will be
- // translated to NotLeaderForPartitionException in the response if version <= 5
+ // translated to NotLeaderOrFollowerException in the response if version <= 5
private static final Schema FETCH_REQUEST_V6 = FETCH_REQUEST_V5;
// V7 added incremental fetch requests.
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
index 21edcc5..984c50c 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
@@ -58,8 +58,8 @@ import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
*
* - {@link Errors#OFFSET_OUT_OF_RANGE} If the fetch offset is out of range for a requested partition
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have READ access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
* matches the known leader epoch on the broker or is empty
* - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
* - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
@@ -190,7 +190,7 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
new Field(RESPONSES_KEY_NAME, new ArrayOf(FETCH_RESPONSE_TOPIC_V5)));
// V6 bumped up to indicate that the client supports KafkaStorageException. The KafkaStorageException will
- // be translated to NotLeaderForPartitionException in the response if version <= 5
+ // be translated to NotLeaderOrFollowerException in the response if version <= 5
private static final Schema FETCH_RESPONSE_V6 = FETCH_RESPONSE_V5;
// V7 added incremental fetch responses and a top-level error code.
@@ -549,9 +549,9 @@ public class FetchResponse<T extends BaseRecords> extends AbstractResponse {
// If consumer sends FetchRequest V5 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
// UnknownServerException which is not retriable. We can ensure that consumer will update metadata and retry
- // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if FetchRequest version <= 5
+ // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if FetchRequest version <= 5
if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 5)
- errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
+ errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
Struct partitionData = topicData.instance(PARTITIONS_KEY_NAME);
Struct partitionDataHeader = partitionData.instance(PARTITION_HEADER_KEY_NAME);
partitionDataHeader.set(PARTITION_ID, partitionEntry.getKey());
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
index 6331941..3fe14b0 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
@@ -44,8 +44,8 @@ import static org.apache.kafka.common.protocol.types.Type.INT64;
*
* - {@link Errors#UNSUPPORTED_FOR_MESSAGE_FORMAT} If the message format does not support lookup by timestamp
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have DESCRIBE access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
* matches the known leader epoch on the broker or is empty
* - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
* - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
index c547c09..6573aca 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
@@ -41,8 +41,8 @@ import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
* Possible error codes:
*
* - {@link Errors#TOPIC_AUTHORIZATION_FAILED} If the user does not have DESCRIBE access to a requested topic
- * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker which is not a replica
- * - {@link Errors#NOT_LEADER_FOR_PARTITION} If the broker is not a leader and either the provided leader epoch
+ * - {@link Errors#REPLICA_NOT_AVAILABLE} If the request is received by a broker with version < 2.6 which is not a replica
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER} If the broker is not a leader or follower and either the provided leader epoch
* matches the known leader epoch on the broker or is empty
* - {@link Errors#FENCED_LEADER_EPOCH} If the epoch is lower than the broker's epoch
* - {@link Errors#UNKNOWN_LEADER_EPOCH} If the epoch is larger than the broker's epoch
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
index 485145f..210b71d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java
@@ -99,7 +99,7 @@ public class ProduceRequest extends AbstractRequest {
/**
* The body of PRODUCE_REQUEST_V4 is the same as PRODUCE_REQUEST_V3.
* The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+ * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
*/
private static final Schema PRODUCE_REQUEST_V4 = PRODUCE_REQUEST_V3;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
index 2bb1e86..205d9ab 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java
@@ -56,7 +56,7 @@ public class ProduceResponse extends AbstractResponse {
*
* {@link Errors#CORRUPT_MESSAGE}
* {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
- * {@link Errors#NOT_LEADER_FOR_PARTITION}
+ * {@link Errors#NOT_LEADER_OR_FOLLOWER}
* {@link Errors#MESSAGE_TOO_LARGE}
* {@link Errors#INVALID_TOPIC_EXCEPTION}
* {@link Errors#RECORD_LIST_TOO_LARGE}
@@ -126,7 +126,7 @@ public class ProduceResponse extends AbstractResponse {
/**
* The body of PRODUCE_RESPONSE_V4 is the same as PRODUCE_RESPONSE_V3.
* The version number is bumped up to indicate that the client supports KafkaStorageException.
- * The KafkaStorageException will be translated to NotLeaderForPartitionException in the response if version <= 3
+ * The KafkaStorageException will be translated to NotLeaderOrFollowerException in the response if version <= 3
*/
private static final Schema PRODUCE_RESPONSE_V4 = PRODUCE_RESPONSE_V3;
@@ -265,9 +265,9 @@ public class ProduceResponse extends AbstractResponse {
// If producer sends ProduceRequest V3 or earlier, the client library is not guaranteed to recognize the error code
// for KafkaStorageException. In this case the client library will translate KafkaStorageException to
// UnknownServerException which is not retriable. We can ensure that producer will update metadata and retry
- // by converting the KafkaStorageException to NotLeaderForPartitionException in the response if ProduceRequest version <= 3
+ // by converting the KafkaStorageException to NotLeaderOrFollowerException in the response if ProduceRequest version <= 3
if (errorCode == Errors.KAFKA_STORAGE_ERROR.code() && version <= 3)
- errorCode = Errors.NOT_LEADER_FOR_PARTITION.code();
+ errorCode = Errors.NOT_LEADER_OR_FOLLOWER.code();
Struct partStruct = topicData.instance(PARTITION_RESPONSES_KEY_NAME)
.set(PARTITION_ID, partitionEntry.getKey())
.set(ERROR_CODE, errorCode)
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
index 783ef3b..8fdcb99 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersResponse.java
@@ -37,7 +37,7 @@ import java.util.Map;
* - {@link Errors#CORRUPT_MESSAGE}
* - {@link Errors#INVALID_PRODUCER_EPOCH}
* - {@link Errors#UNKNOWN_TOPIC_OR_PARTITION}
- * - {@link Errors#NOT_LEADER_FOR_PARTITION}
+ * - {@link Errors#NOT_LEADER_OR_FOLLOWER}
* - {@link Errors#MESSAGE_TOO_LARGE}
* - {@link Errors#RECORD_LIST_TOO_LARGE}
* - {@link Errors#NOT_ENOUGH_REPLICAS}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ac04e5d..aa55866 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -51,7 +51,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.LeaderNotAvailableException;
import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.NotLeaderForPartitionException;
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -1088,7 +1088,7 @@ public class KafkaAdminClientTest {
new DeleteRecordsResponseData.DeleteRecordsPartitionResult()
.setPartitionIndex(myTopicPartition3.partition())
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
- .setErrorCode(Errors.NOT_LEADER_FOR_PARTITION.code()),
+ .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()),
new DeleteRecordsResponseData.DeleteRecordsPartitionResult()
.setPartitionIndex(myTopicPartition4.partition())
.setLowWatermark(DeleteRecordsResponse.INVALID_LOW_WATERMARK)
@@ -1157,7 +1157,7 @@ public class KafkaAdminClientTest {
myTopicPartition3Result.get();
fail("get() should throw ExecutionException");
} catch (ExecutionException e1) {
- assertTrue(e1.getCause() instanceof NotLeaderForPartitionException);
+ assertTrue(e1.getCause() instanceof NotLeaderOrFollowerException);
}
// "unknown topic or partition" failure on records deletion for partition 4
@@ -3157,7 +3157,7 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
- responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
+ responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
responseData.put(tp1, new PartitionData(Errors.LEADER_NOT_AVAILABLE, -2L, 123L, Optional.of(456)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
@@ -3215,10 +3215,10 @@ public class KafkaAdminClientTest {
env.kafkaClient().prepareResponse(prepareMetadataResponse(oldCluster, Errors.NONE));
Map<TopicPartition, PartitionData> responseData = new HashMap<>();
- responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_FOR_PARTITION, -1L, 345L, Optional.of(543)));
+ responseData.put(tp0, new PartitionData(Errors.NOT_LEADER_OR_FOLLOWER, -1L, 345L, Optional.of(543)));
env.kafkaClient().prepareResponseFrom(new ListOffsetResponse(responseData), node0);
- // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_FOR_PARTITION
+ // updating leader from node0 to node1 and metadata refresh because of NOT_LEADER_OR_FOLLOWER
final PartitionInfo newPartitionInfo = new PartitionInfo("foo", 0, node1,
new Node[]{node0, node1, node2}, new Node[]{node0, node1, node2});
final Cluster newCluster = new Cluster("mockClusterId", nodes, singletonList(newPartitionInfo),
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 0abcdd0..9fc54bf 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -591,7 +591,7 @@ public class KafkaConsumerTest {
return timestamps.get(tp0).timestamp == ListOffsetRequest.LATEST_TIMESTAMP &&
timestamps.get(tp1).timestamp == ListOffsetRequest.EARLIEST_TIMESTAMP;
}, listOffsetsResponse(Collections.singletonMap(tp0, 50L),
- Collections.singletonMap(tp1, Errors.NOT_LEADER_FOR_PARTITION)));
+ Collections.singletonMap(tp1, Errors.NOT_LEADER_OR_FOLLOWER)));
client.prepareResponse(
body -> {
FetchRequest request = (FetchRequest) body;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index c6f379b..c82fbd7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -406,7 +406,7 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
- client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+ client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
consumerClient.poll(time.timer(0));
assertTrue(fetcher.hasCompletedFetches());
@@ -1097,13 +1097,13 @@ public class FetcherTest {
}
@Test
- public void testFetchNotLeaderForPartition() {
+ public void testFetchNotLeaderOrFollower() {
buildFetcher();
assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
assertEquals(1, fetcher.sendFetches());
- client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_FOR_PARTITION, 100L, 0));
+ client.prepareResponse(fullFetchResponse(tp0, this.records, Errors.NOT_LEADER_OR_FOLLOWER, 100L, 0));
consumerClient.poll(time.timer(0));
assertEquals(0, fetcher.fetchedRecords().size());
assertEquals(0L, metadata.timeToNextUpdate(time.milliseconds()));
@@ -1579,7 +1579,7 @@ public class FetcherTest {
// First fetch fails with stale metadata
client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP,
- Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_FOR_PARTITION, 1L, 5L), false);
+ Optional.of(validLeaderEpoch)), listOffsetResponse(Errors.NOT_LEADER_OR_FOLLOWER, 1L, 5L), false);
fetcher.resetOffsetsIfNeeded();
consumerClient.pollNoWakeup();
assertFalse(subscriptions.hasValidPosition(tp0));
@@ -2443,11 +2443,11 @@ public class FetcherTest {
// Error code none with known offset
testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
// Test both of partition has error.
- testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
+ testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
// Test the second partition has error.
- testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
+ testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER, 10L, 100L, 10L, 100L);
// Test different errors.
- testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
+ testGetOffsetsForTimesWithError(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NONE, 10L, 100L, 10L, 100L);
testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
@@ -2473,7 +2473,7 @@ public class FetcherTest {
@Test
public void testGetOffsetByTimeWithPartitionsRetryCouldTriggerMetadataUpdate() {
- List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_FOR_PARTITION,
+ List<Errors> retriableErrors = Arrays.asList(Errors.NOT_LEADER_OR_FOLLOWER,
Errors.REPLICA_NOT_AVAILABLE, Errors.KAFKA_STORAGE_ERROR, Errors.OFFSET_NOT_AVAILABLE,
Errors.LEADER_NOT_AVAILABLE, Errors.FENCED_LEADER_EPOCH, Errors.UNKNOWN_LEADER_EPOCH);
@@ -2520,7 +2520,7 @@ public class FetcherTest {
// We will count the answered future response in the end to verify if this is the case.
Map<TopicPartition, ListOffsetResponse.PartitionData> paritionDataWithFatalError = new HashMap<>(allPartitionData);
paritionDataWithFatalError.put(tp1, new ListOffsetResponse.PartitionData(
- Errors.NOT_LEADER_FOR_PARTITION, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
+ Errors.NOT_LEADER_OR_FOLLOWER, ListOffsetRequest.LATEST_TIMESTAMP, -1L, Optional.empty()));
client.prepareResponseFrom(new ListOffsetResponse(paritionDataWithFatalError), originalLeader);
// The request to new leader must only contain one partition tp1 with error.
@@ -2689,7 +2689,7 @@ public class FetcherTest {
buildFetcher();
Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
- partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_FOR_PARTITION,
+ partitionData.put(tp0, new ListOffsetResponse.PartitionData(Errors.NOT_LEADER_OR_FOLLOWER,
ListOffsetResponse.UNKNOWN_TIMESTAMP, ListOffsetResponse.UNKNOWN_OFFSET,
Optional.empty()));
partitionData.put(tp1, new ListOffsetResponse.PartitionData(Errors.UNKNOWN_TOPIC_OR_PARTITION,
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 3ec8c80..4dfd014 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -868,7 +868,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(tp0));
- client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1));
+ client.respondToRequest(firstClientRequest, produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, -1));
sender.runOnce(); // receive response 0
@@ -1084,7 +1084,7 @@ public class SenderTest {
assertEquals(2, client.inFlightRequestCount());
- sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+ sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
sender.runOnce(); // receive first response
Node node = metadata.fetch().nodes().get(0);
@@ -1140,7 +1140,7 @@ public class SenderTest {
assertEquals(2, client.inFlightRequestCount());
- sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+ sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
sender.runOnce(); // receive first response
Node node = metadata.fetch().nodes().get(0);
@@ -1170,7 +1170,7 @@ public class SenderTest {
// Send first ProduceRequest
Future<RecordMetadata> request1 = appendToAccumulator(tp0, 0L, "key", "value");
sender.runOnce(); // send request
- sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_FOR_PARTITION, -1);
+ sendIdempotentProducerResponse(0, tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1);
sender.runOnce(); // receive response
assertEquals(1L, transactionManager.sequenceNumber(tp0).longValue());
@@ -1217,7 +1217,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
- responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+ responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
client.respond(produceResponse(responses));
@@ -1258,7 +1258,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
- responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+ responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
client.respond(produceResponse(responses));
sender.initiateClose(); // initiate close
@@ -1291,7 +1291,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
- responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+ responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
client.respond(produceResponse(responses));
sender.runOnce(); // out of order sequence error triggers producer ID reset because epoch is maxed out
@@ -1325,7 +1325,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
Map<TopicPartition, OffsetAndError> responses = new LinkedHashMap<>();
- responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_FOR_PARTITION));
+ responses.put(tp1, new OffsetAndError(-1, Errors.NOT_LEADER_OR_FOLLOWER));
responses.put(tp0, new OffsetAndError(-1, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
client.respond(produceResponse(responses));
sender.runOnce();
@@ -1336,7 +1336,7 @@ public class SenderTest {
assertFalse(successfulResponse.isDone());
// The response comes back with a retriable error.
- client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_FOR_PARTITION, -1));
+ client.respond(produceResponse(tp1, 0, Errors.NOT_LEADER_OR_FOLLOWER, -1));
sender.runOnce();
// The response
@@ -2147,7 +2147,7 @@ public class SenderTest {
assertEquals(1, client.inFlightRequestCount());
time.sleep(deliverTimeoutMs);
- client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_FOR_PARTITION, -1)); // return a retriable error
+ client.respond(produceResponse(tp0, -1, Errors.NOT_LEADER_OR_FOLLOWER, -1)); // return a retriable error
sender.runOnce(); // expire the batch
assertTrue(request1.isDone());
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 126d447..1d14642 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2192,7 +2192,7 @@ public class TransactionManagerTest {
Future<RecordMetadata> responseFuture = appendToAccumulator(tp0);
prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, producerId);
- prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, producerId, epoch);
+ prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, producerId, epoch);
runUntil(() -> !client.hasPendingResponses());
assertFalse(responseFuture.isDone());
@@ -2385,7 +2385,7 @@ public class TransactionManagerTest {
runUntil(() -> transactionManager.transactionContainsPartition(tp0));
assertTrue(transactionManager.isSendToPartitionAllowed(tp0));
- prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, producerId, epoch);
+ prepareProduceResponse(Errors.NOT_LEADER_OR_FOLLOWER, producerId, epoch);
runUntil(() -> !client.hasPendingResponses());
assertFalse(responseFuture.isDone());
@@ -2920,10 +2920,10 @@ public class TransactionManagerTest {
sender.runOnce();
assertEquals(1, accumulator.batches().get(tp1).size());
- // Partition failover occurs and tp1 returns a NOT_LEADER_FOR_PARTITION error
+ // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
// Despite having the old epoch, the batch should retry
ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(
- Errors.NOT_LEADER_FOR_PARTITION, -1, -1, 600L);
+ Errors.NOT_LEADER_OR_FOLLOWER, -1, -1, 600L);
assertTrue(transactionManager.canRetry(t1b2Response, tp1b2));
accumulator.reenqueue(tp1b2, time.milliseconds());
@@ -3042,10 +3042,10 @@ public class TransactionManagerTest {
sender.runOnce();
assertEquals(1, accumulator.batches().get(tp1).size());
- // Partition failover occurs and tp1 returns a NOT_LEADER_FOR_PARTITION error
+ // Partition failover occurs and tp1 returns a NOT_LEADER_OR_FOLLOWER error
// Despite having the old epoch, the batch should retry
ProduceResponse.PartitionResponse t1b2Response = new ProduceResponse.PartitionResponse(
- Errors.NOT_LEADER_FOR_PARTITION, -1, -1, 600L);
+ Errors.NOT_LEADER_OR_FOLLOWER, -1, -1, 600L);
assertTrue(transactionManager.canRetry(t1b2Response, tp1b2));
accumulator.reenqueue(tp1b2, time.milliseconds());
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
index edb962b..bf2fe13 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/LeaderAndIsrResponseTest.java
@@ -66,7 +66,7 @@ public class LeaderAndIsrResponseTest {
@Test
public void testErrorCountsWithTopLevelError() {
List<LeaderAndIsrPartitionError> partitions = createPartitions("foo",
- asList(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION));
+ asList(Errors.NONE, Errors.NOT_LEADER_OR_FOLLOWER));
LeaderAndIsrResponse response = new LeaderAndIsrResponse(new LeaderAndIsrResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setPartitionErrors(partitions));
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
index d8d8f4f..5a3af47 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/StopReplicaResponseTest.java
@@ -59,7 +59,7 @@ public class StopReplicaResponseTest {
List<StopReplicaPartitionError> errors = new ArrayList<>();
errors.add(new StopReplicaPartitionError().setTopicName("foo").setPartitionIndex(0));
errors.add(new StopReplicaPartitionError().setTopicName("foo").setPartitionIndex(1)
- .setErrorCode(Errors.NOT_LEADER_FOR_PARTITION.code()));
+ .setErrorCode(Errors.NOT_LEADER_OR_FOLLOWER.code()));
StopReplicaResponse response = new StopReplicaResponse(new StopReplicaResponseData()
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code())
.setPartitionErrors(errors));
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index ab4bb75..4be49a5 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -331,7 +331,7 @@ class Partition(val topicPartition: TopicPartition,
def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
private def getReplicaOrException(replicaId: Int): Replica = getReplica(replicaId).getOrElse{
- throw new ReplicaNotAvailableException(s"Replica with id $replicaId is not available on broker $localBrokerId")
+ throw new NotLeaderOrFollowerException(s"Replica with id $replicaId is not available on broker $localBrokerId")
}
private def checkCurrentLeaderEpoch(remoteLeaderEpochOpt: Optional[Integer]): Errors = {
@@ -354,16 +354,13 @@ class Partition(val topicPartition: TopicPartition,
checkCurrentLeaderEpoch(currentLeaderEpoch) match {
case Errors.NONE =>
if (requireLeader && !isLeader) {
- Right(Errors.NOT_LEADER_FOR_PARTITION)
+ Right(Errors.NOT_LEADER_OR_FOLLOWER)
} else {
log match {
case Some(partitionLog) =>
Left(partitionLog)
case _ =>
- if (requireLeader)
- Right(Errors.NOT_LEADER_FOR_PARTITION)
- else
- Right(Errors.REPLICA_NOT_AVAILABLE)
+ Right(Errors.NOT_LEADER_OR_FOLLOWER)
}
}
case error =>
@@ -372,12 +369,12 @@ class Partition(val topicPartition: TopicPartition,
}
def localLogOrException: Log = log.getOrElse {
- throw new ReplicaNotAvailableException(s"Log for partition $topicPartition is not available " +
+ throw new NotLeaderOrFollowerException(s"Log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}
def futureLocalLogOrException: Log = futureLog.getOrElse {
- throw new ReplicaNotAvailableException(s"Future log for partition $topicPartition is not available " +
+ throw new NotLeaderOrFollowerException(s"Future log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}
@@ -760,7 +757,7 @@ class Partition(val topicPartition: TopicPartition,
} else
(false, Errors.NONE)
case None =>
- (false, Errors.NOT_LEADER_FOR_PARTITION)
+ (false, Errors.NOT_LEADER_OR_FOLLOWER)
}
}
@@ -822,7 +819,7 @@ class Partition(val topicPartition: TopicPartition,
*/
def lowWatermarkIfLeader: Long = {
if (!isLeader)
- throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
+ throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
// lowWatermarkIfLeader may be called many times when a DeleteRecordsRequest is outstanding,
// care has been taken to avoid generating unnecessary collections in this code
@@ -987,7 +984,7 @@ class Partition(val topicPartition: TopicPartition,
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
- throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d"
+ throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
@@ -1130,7 +1127,7 @@ class Partition(val topicPartition: TopicPartition,
requestedOffset = convertedOffset,
lowWatermark = lowWatermarkIfLeader)
case None =>
- throw new NotLeaderForPartitionException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
+ throw new NotLeaderOrFollowerException(s"Leader not local for partition $topicPartition on broker $localBrokerId")
}
}
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 4898e22..dea0b77 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -285,7 +285,7 @@ class GroupMetadataManager(brokerId: Int,
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
- case Errors.NOT_LEADER_FOR_PARTITION
+ case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
@@ -432,7 +432,7 @@ class GroupMetadataManager(brokerId: Int,
| Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND =>
Errors.COORDINATOR_NOT_AVAILABLE
- case Errors.NOT_LEADER_FOR_PARTITION
+ case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index 66edc47..c2efdbd 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -143,7 +143,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
throw new IllegalStateException(s"Received fatal error ${error.exceptionName} while sending txn marker for $transactionalId")
case Errors.UNKNOWN_TOPIC_OR_PARTITION |
- Errors.NOT_LEADER_FOR_PARTITION |
+ Errors.NOT_LEADER_OR_FOLLOWER |
Errors.NOT_ENOUGH_REPLICAS |
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND |
Errors.REQUEST_TIMED_OUT |
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 8eb65df..923aed9 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -513,7 +513,7 @@ class TransactionStateManager(brokerId: Int,
| Errors.REQUEST_TIMED_OUT => // note that for timed out request we return NOT_AVAILABLE error code to let client retry
Errors.COORDINATOR_NOT_AVAILABLE
- case Errors.NOT_LEADER_FOR_PARTITION
+ case Errors.NOT_LEADER_OR_FOLLOWER
| Errors.KAFKA_STORAGE_ERROR =>
Errors.NOT_COORDINATOR
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index f80d6fa..a6322ea 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -377,7 +377,7 @@ abstract class AbstractFetcherThread(name: String,
case Errors.FENCED_LEADER_EPOCH =>
if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
- case Errors.NOT_LEADER_FOR_PARTITION =>
+ case Errors.NOT_LEADER_OR_FOLLOWER =>
debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
"that the partition is being moved")
partitionsWithError += topicPartition
@@ -556,7 +556,7 @@ abstract class AbstractFetcherThread(name: String,
case e @ (_ : UnknownTopicOrPartitionException |
_ : UnknownLeaderEpochException |
- _ : NotLeaderForPartitionException) =>
+ _ : NotLeaderOrFollowerException) =>
info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
true
diff --git a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
index 5275275..e665688 100644
--- a/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
+++ b/core/src/main/scala/kafka/server/DelayedDeleteRecords.scala
@@ -80,7 +80,7 @@ class DelayedDeleteRecords(delayMs: Long,
val leaderLW = partition.lowWatermarkIfLeader
(leaderLW >= status.requiredOffset, Errors.NONE, leaderLW)
case None =>
- (false, Errors.NOT_LEADER_FOR_PARTITION, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
+ (false, Errors.NOT_LEADER_OR_FOLLOWER, DeleteRecordsResponse.INVALID_LOW_WATERMARK)
}
case HostedPartition.Offline =>
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 50cd4ee..3aa2ee4 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -87,8 +87,7 @@ class DelayedFetch(delayMs: Long,
val fetchLeaderEpoch = fetchStatus.fetchInfo.currentLeaderEpoch
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
- val partition = replicaManager.getPartitionOrException(topicPartition,
- expectLeader = fetchMetadata.fetchOnlyLeader)
+ val partition = replicaManager.getPartitionOrException(topicPartition)
val offsetSnapshot = partition.fetchOffsetSnapshot(fetchLeaderEpoch, fetchMetadata.fetchOnlyLeader)
val endOffset = fetchMetadata.fetchIsolation match {
@@ -121,11 +120,8 @@ class DelayedFetch(delayMs: Long,
}
}
} catch {
- case _: NotLeaderForPartitionException => // Case A
- debug(s"Broker is no longer the leader of $topicPartition, satisfy $fetchMetadata immediately")
- return forceComplete()
- case _: ReplicaNotAvailableException => // Case B
- debug(s"Broker no longer has a replica of $topicPartition, satisfy $fetchMetadata immediately")
+ case _: NotLeaderOrFollowerException => // Case A or Case B
+ debug(s"Broker is no longer the leader or follower of $topicPartition, satisfy $fetchMetadata immediately")
return forceComplete()
case _: UnknownTopicOrPartitionException => // Case C
debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchMetadata immediately")
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index 83e6142..668851d 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -85,7 +85,7 @@ class DelayedProduce(delayMs: Long,
trace(s"Checking produce satisfaction for $topicPartition, current status $status")
// skip those partitions that have already been satisfied
if (status.acksPending) {
- val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition, expectLeader = true) match {
+ val (hasEnough, error) = replicaManager.getPartitionOrError(topicPartition) match {
case Left(err) =>
// Case A
(false, err)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 86649c1..51c1701 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -923,10 +923,10 @@ class KafkaApis(val requestChannel: RequestChannel,
fetchOnlyFromLeader = offsetRequest.replicaId != ListOffsetRequest.DEBUGGING_REPLICA_ID)
(topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, offsets.map(JLong.valueOf).asJava))
} catch {
- // NOTE: UnknownTopicOrPartitionException and NotLeaderForPartitionException are special cased since these error messages
+ // NOTE: UnknownTopicOrPartitionException and NotLeaderOrFollowerException are special cased since these error messages
// are typically transient and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderForPartitionException |
+ _ : NotLeaderOrFollowerException |
_ : KafkaStorageException) =>
debug("Offset request with correlation id %d from client %s on partition %s failed due to %s".format(
correlationId, clientId, topicPartition, e.getMessage))
@@ -998,7 +998,7 @@ class KafkaApis(val requestChannel: RequestChannel,
// NOTE: These exceptions are special cased since these error messages are typically transient or the client
// would have received a clear exception and there is no value in logging the entire stack trace for the same
case e @ (_ : UnknownTopicOrPartitionException |
- _ : NotLeaderForPartitionException |
+ _ : NotLeaderOrFollowerException |
_ : UnknownLeaderEpochException |
_ : FencedLeaderEpochException |
_ : KafkaStorageException |
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 5db3464..91b873f 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -146,12 +146,12 @@ class ReplicaAlterLogDirsThread(name: String,
}
override protected def fetchEarliestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
- val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+ val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.localLogOrException.logStartOffset
}
override protected def fetchLatestOffsetFromLeader(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
- val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+ val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.localLogOrException.logEndOffset
}
@@ -166,7 +166,7 @@ class ReplicaAlterLogDirsThread(name: String,
val endOffset = if (epochData.leaderEpoch == UNDEFINED_EPOCH) {
new EpochEndOffset(UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
} else {
- val partition = replicaMgr.getPartitionOrException(tp, expectLeader = false)
+ val partition = replicaMgr.getPartitionOrException(tp)
partition.lastOffsetForLeaderEpoch(
currentLeaderEpoch = epochData.currentLeaderEpoch,
leaderEpoch = epochData.leaderEpoch,
@@ -198,12 +198,12 @@ class ReplicaAlterLogDirsThread(name: String,
* exchange with the current replica to truncate to the largest common log prefix for the topic partition
*/
override def truncate(topicPartition: TopicPartition, truncationState: OffsetTruncationState): Unit = {
- val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+ val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.truncateTo(truncationState.offset, isFuture = true)
}
override protected def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
- val partition = replicaMgr.getPartitionOrException(topicPartition, expectLeader = false)
+ val partition = replicaMgr.getPartitionOrException(topicPartition)
partition.truncateFullyAndStartAt(offset, isFuture = true)
}
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 04d88b9..8f05d65 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -500,8 +500,8 @@ class ReplicaManager(val config: KafkaConfig,
allPartitions.values.iterator.count(_ == HostedPartition.Offline)
}
- def getPartitionOrException(topicPartition: TopicPartition, expectLeader: Boolean): Partition = {
- getPartitionOrError(topicPartition, expectLeader) match {
+ def getPartitionOrException(topicPartition: TopicPartition): Partition = {
+ getPartitionOrError(topicPartition) match {
case Left(Errors.KAFKA_STORAGE_ERROR) =>
throw new KafkaStorageException(s"Partition $topicPartition is in an offline log directory")
@@ -512,7 +512,7 @@ class ReplicaManager(val config: KafkaConfig,
}
}
- def getPartitionOrError(topicPartition: TopicPartition, expectLeader: Boolean): Either[Errors, Partition] = {
+ def getPartitionOrError(topicPartition: TopicPartition): Either[Errors, Partition] = {
getPartition(topicPartition) match {
case HostedPartition.Online(partition) =>
Right(partition)
@@ -521,15 +521,11 @@ class ReplicaManager(val config: KafkaConfig,
Left(Errors.KAFKA_STORAGE_ERROR)
case HostedPartition.None if metadataCache.contains(topicPartition) =>
- if (expectLeader) {
- // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER which
- // forces clients to refresh metadata to find the new location. This can happen, for example,
- // during a partition reassignment if a produce request from the client is sent to a broker after
- // the local replica has been deleted.
- Left(Errors.NOT_LEADER_FOR_PARTITION)
- } else {
- Left(Errors.REPLICA_NOT_AVAILABLE)
- }
+ // The topic exists, but this broker is no longer a replica of it, so we return NOT_LEADER_OR_FOLLOWER which
+ // forces clients to refresh metadata to find the new location. This can happen, for example,
+ // during a partition reassignment if a produce request from the client is sent to a broker after
+ // the local replica has been deleted.
+ Left(Errors.NOT_LEADER_OR_FOLLOWER)
case HostedPartition.None =>
Left(Errors.UNKNOWN_TOPIC_OR_PARTITION)
@@ -537,15 +533,15 @@ class ReplicaManager(val config: KafkaConfig,
}
def localLogOrException(topicPartition: TopicPartition): Log = {
- getPartitionOrException(topicPartition, expectLeader = false).localLogOrException
+ getPartitionOrException(topicPartition).localLogOrException
}
def futureLocalLogOrException(topicPartition: TopicPartition): Log = {
- getPartitionOrException(topicPartition, expectLeader = false).futureLocalLogOrException
+ getPartitionOrException(topicPartition).futureLocalLogOrException
}
def futureLogExists(topicPartition: TopicPartition): Boolean = {
- getPartitionOrException(topicPartition, expectLeader = false).futureLog.isDefined
+ getPartitionOrException(topicPartition).futureLog.isDefined
}
def localLog(topicPartition: TopicPartition): Option[Log] = {
@@ -626,12 +622,12 @@ class ReplicaManager(val config: KafkaConfig,
(topicPartition, LogDeleteRecordsResult(-1L, -1L, Some(new InvalidTopicException(s"Cannot delete records of internal topic ${topicPartition.topic}"))))
} else {
try {
- val partition = getPartitionOrException(topicPartition, expectLeader = true)
+ val partition = getPartitionOrException(topicPartition)
val logDeleteResult = partition.deleteRecordsOnLeader(requestedOffset)
(topicPartition, logDeleteResult)
} catch {
case e@ (_: UnknownTopicOrPartitionException |
- _: NotLeaderForPartitionException |
+ _: NotLeaderOrFollowerException |
_: OffsetOutOfRangeException |
_: PolicyViolationException |
_: KafkaStorageException) =>
@@ -688,11 +684,11 @@ class ReplicaManager(val config: KafkaConfig,
// If the log for this partition has not been created yet:
// 1) Record the destination log directory in the memory so that the partition will be created in this log directory
// when broker receives LeaderAndIsrRequest for this partition later.
- // 2) Respond with ReplicaNotAvailableException for this partition in the AlterReplicaLogDirsResponse
+ // 2) Respond with NotLeaderOrFollowerException for this partition in the AlterReplicaLogDirsResponse
logManager.maybeUpdatePreferredLogDir(topicPartition, destinationDir)
- // throw ReplicaNotAvailableException if replica does not exist for the given partition
- val partition = getPartitionOrException(topicPartition, expectLeader = false)
+ // throw NotLeaderOrFollowerException if replica does not exist for the given partition
+ val partition = getPartitionOrException(topicPartition)
partition.localLogOrException
// If the destinationLDir is different from the current log directory of the replica:
@@ -716,8 +712,12 @@ class ReplicaManager(val config: KafkaConfig,
_: LogDirNotFoundException |
_: ReplicaNotAvailableException |
_: KafkaStorageException) =>
- warn("Unable to alter log dirs for %s".format(topicPartition), e)
+ warn(s"Unable to alter log dirs for $topicPartition", e)
(topicPartition, Errors.forException(e))
+ case e: NotLeaderOrFollowerException =>
+ // Retaining REPLICA_NOT_AVAILABLE exception for ALTER_REPLICA_LOG_DIRS for compatibility
+ warn(s"Unable to alter log dirs for $topicPartition", e)
+ (topicPartition, Errors.REPLICA_NOT_AVAILABLE)
case t: Throwable =>
error("Error while changing replica dir for partition %s".format(topicPartition), t)
(topicPartition, Errors.forException(t))
@@ -879,7 +879,7 @@ class ReplicaManager(val config: KafkaConfig,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
- val partition = getPartitionOrException(topicPartition, expectLeader = true)
+ val partition = getPartitionOrException(topicPartition)
val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
val numAppendedMessages = info.numMessages
@@ -898,7 +898,7 @@ class ReplicaManager(val config: KafkaConfig,
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
case e@ (_: UnknownTopicOrPartitionException |
- _: NotLeaderForPartitionException |
+ _: NotLeaderOrFollowerException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
@@ -922,7 +922,7 @@ class ReplicaManager(val config: KafkaConfig,
isolationLevel: Option[IsolationLevel],
currentLeaderEpoch: Optional[Integer],
fetchOnlyFromLeader: Boolean): Option[TimestampAndOffset] = {
- val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
+ val partition = getPartitionOrException(topicPartition)
partition.fetchOffsetForTimestamp(timestamp, isolationLevel, currentLeaderEpoch, fetchOnlyFromLeader)
}
@@ -931,7 +931,7 @@ class ReplicaManager(val config: KafkaConfig,
maxNumOffsets: Int,
isFromConsumer: Boolean,
fetchOnlyFromLeader: Boolean): Seq[Long] = {
- val partition = getPartitionOrException(topicPartition, expectLeader = fetchOnlyFromLeader)
+ val partition = getPartitionOrException(topicPartition)
partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
}
@@ -1050,7 +1050,7 @@ class ReplicaManager(val config: KafkaConfig,
s"remaining response limit $limitBytes" +
(if (minOneMessage) s", ignoring response/partition size limits" else ""))
- val partition = getPartitionOrException(tp, expectLeader = fetchOnlyFromLeader)
+ val partition = getPartitionOrException(tp)
val fetchTimeMs = time.milliseconds
// If we are the leader, determine the preferred read-replica
@@ -1108,7 +1108,7 @@ class ReplicaManager(val config: KafkaConfig,
// NOTE: Failed fetch requests metric is not incremented for known exceptions since it
// is supposed to indicate un-expected failure of a broker in handling a fetch request
case e@ (_: UnknownTopicOrPartitionException |
- _: NotLeaderForPartitionException |
+ _: NotLeaderOrFollowerException |
_: UnknownLeaderEpochException |
_: FencedLeaderEpochException |
_: ReplicaNotAvailableException |
@@ -1775,7 +1775,7 @@ class ReplicaManager(val config: KafkaConfig,
new EpochEndOffset(Errors.KAFKA_STORAGE_ERROR, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
case HostedPartition.None if metadataCache.contains(tp) =>
- new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
+ new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
case HostedPartition.None =>
new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET)
diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index bab4452..778ed7b 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -698,7 +698,7 @@ class PlaintextAdminIntegrationTest extends BaseAdminIntegrationTest {
lowWatermark.contains(5L)
} catch {
case e: ExecutionException if e.getCause.isInstanceOf[LeaderNotAvailableException] ||
- e.getCause.isInstanceOf[NotLeaderForPartitionException] => false
+ e.getCause.isInstanceOf[NotLeaderOrFollowerException] => false
}
}, s"Expected low watermark of the partition to be 5 but got ${lowWatermark.getOrElse("no response within the timeout")}")
}
diff --git a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index beb5232..fb4d839 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -161,7 +161,7 @@ class ProducerFailureHandlingTest extends KafkaServerTestHarness {
*
* TODO: other exceptions that can be thrown in ExecutionException:
* UnknownTopicOrPartitionException
- * NotLeaderForPartitionException
+ * NotLeaderOrFollowerException
* LeaderNotAvailableException
* CorruptRecordException
* TimeoutException
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index c43d481..38831b5 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -21,7 +21,7 @@ import java.util.Optional
import scala.collection.Seq
import kafka.cluster.Partition
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{FencedLeaderEpochException, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.{FencedLeaderEpochException, NotLeaderOrFollowerException}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.FetchRequest
@@ -62,7 +62,7 @@ class DelayedFetchTest extends EasyMockSupport {
val partition: Partition = mock(classOf[Partition])
- EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
+ EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
.andReturn(partition)
EasyMock.expect(partition.fetchOffsetSnapshot(currentLeaderEpoch, fetchOnlyFromLeader = true))
.andThrow(new FencedLeaderEpochException("Requested epoch has been fenced"))
@@ -81,7 +81,7 @@ class DelayedFetchTest extends EasyMockSupport {
}
@Test
- def testReplicaNotAvailable(): Unit = {
+ def testNotLeaderOrFollower(): Unit = {
val topicPartition = new TopicPartition("topic", 0)
val fetchOffset = 500L
val logStartOffset = 0L
@@ -106,9 +106,9 @@ class DelayedFetchTest extends EasyMockSupport {
clientMetadata = None,
responseCallback = callback)
- EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
- .andThrow(new ReplicaNotAvailableException(s"Replica for $topicPartition not available"))
- expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.REPLICA_NOT_AVAILABLE)
+ EasyMock.expect(replicaManager.getPartitionOrException(topicPartition))
+ .andThrow(new NotLeaderOrFollowerException(s"Replica for $topicPartition not available"))
+ expectReadFromReplicaWithError(replicaId, topicPartition, fetchStatus.fetchInfo, Errors.NOT_LEADER_OR_FOLLOWER)
EasyMock.expect(replicaManager.isAddingReplica(EasyMock.anyObject(), EasyMock.anyInt())).andReturn(false)
replayAll()
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 725117f..dc73b4d 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -30,7 +30,7 @@ import kafka.server._
import kafka.server.checkpoints.OffsetCheckpoints
import kafka.utils._
import org.apache.kafka.common.{IsolationLevel, TopicPartition}
-import org.apache.kafka.common.errors.{ApiException, OffsetNotAvailableException, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.{ApiException, NotLeaderOrFollowerException, OffsetNotAvailableException}
import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
@@ -271,8 +271,8 @@ class PartitionTest extends AbstractPartitionTest {
assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
- assertSnapshotError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
- assertSnapshotError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
+ assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+ assertSnapshotError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
assertSnapshotError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertSnapshotError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@@ -312,8 +312,8 @@ class PartitionTest extends AbstractPartitionTest {
assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
- assertLastOffsetForLeaderError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
- assertLastOffsetForLeaderError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+ assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+ assertLastOffsetForLeaderError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertLastOffsetForLeaderError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertLastOffsetForLeaderError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@@ -372,8 +372,8 @@ class PartitionTest extends AbstractPartitionTest {
assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
- assertReadRecordsError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
- assertReadRecordsError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+ assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+ assertReadRecordsError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertReadRecordsError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertReadRecordsError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@@ -430,8 +430,8 @@ class PartitionTest extends AbstractPartitionTest {
assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = false)
assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = false)
- assertFetchOffsetError(Errors.NOT_LEADER_FOR_PARTITION, Optional.empty(), fetchOnlyLeader = true)
- assertFetchOffsetError(Errors.NOT_LEADER_FOR_PARTITION, Optional.of(leaderEpoch), fetchOnlyLeader = true)
+ assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.empty(), fetchOnlyLeader = true)
+ assertFetchOffsetError(Errors.NOT_LEADER_OR_FOLLOWER, Optional.of(leaderEpoch), fetchOnlyLeader = true)
assertFetchOffsetError(Errors.FENCED_LEADER_EPOCH, Optional.of(leaderEpoch - 1), fetchOnlyLeader = true)
assertFetchOffsetError(Errors.UNKNOWN_LEADER_EPOCH, Optional.of(leaderEpoch + 1), fetchOnlyLeader = true)
}
@@ -783,14 +783,14 @@ class PartitionTest extends AbstractPartitionTest {
@Test
def testGetReplica(): Unit = {
assertEquals(None, partition.log)
- assertThrows[ReplicaNotAvailableException] {
+ assertThrows[NotLeaderOrFollowerException] {
partition.localLogOrException
}
}
@Test
def testAppendRecordsToFollowerWithNoReplicaThrowsException(): Unit = {
- assertThrows[ReplicaNotAvailableException] {
+ assertThrows[NotLeaderOrFollowerException] {
partition.appendRecordsToFollowerOrFutureReplica(
createRecords(List(new SimpleRecord("k1".getBytes, "v1".getBytes)), baseOffset = 0L), isFuture = false)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6266d7f..08a868c 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -994,7 +994,7 @@ class GroupMetadataManagerTest {
assertStoreGroupErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
assertStoreGroupErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
- assertStoreGroupErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
+ assertStoreGroupErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
assertStoreGroupErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.UNKNOWN_SERVER_ERROR)
assertStoreGroupErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.UNKNOWN_SERVER_ERROR)
@@ -1272,7 +1272,7 @@ class GroupMetadataManagerTest {
assertCommitOffsetErrorMapping(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS, Errors.COORDINATOR_NOT_AVAILABLE)
assertCommitOffsetErrorMapping(Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND, Errors.COORDINATOR_NOT_AVAILABLE)
- assertCommitOffsetErrorMapping(Errors.NOT_LEADER_FOR_PARTITION, Errors.NOT_COORDINATOR)
+ assertCommitOffsetErrorMapping(Errors.NOT_LEADER_OR_FOLLOWER, Errors.NOT_COORDINATOR)
assertCommitOffsetErrorMapping(Errors.MESSAGE_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
assertCommitOffsetErrorMapping(Errors.RECORD_LIST_TOO_LARGE, Errors.INVALID_COMMIT_OFFSET_SIZE)
assertCommitOffsetErrorMapping(Errors.INVALID_FETCH_SIZE, Errors.INVALID_COMMIT_OFFSET_SIZE)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 5ae961b..07b2fa8 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -177,8 +177,8 @@ class TransactionMarkerRequestCompletionHandlerTest {
}
@Test
- def shouldRetryPartitionWhenNotLeaderForPartitionError(): Unit = {
- verifyRetriesPartitionOnError(Errors.NOT_LEADER_FOR_PARTITION)
+ def shouldRetryPartitionWhenNotLeaderOrFollowerError(): Unit = {
+ verifyRetriesPartitionOnError(Errors.NOT_LEADER_OR_FOLLOWER)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 3055742..2a8d046 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -366,7 +366,7 @@ class TransactionStateManagerTest {
expectedError = Errors.NOT_COORDINATOR
var failedMetadata = txnMetadata1.prepareAddPartitions(Set[TopicPartition](new TopicPartition("topic2", 0)), time.milliseconds())
- prepareForTxnMessageAppend(Errors.NOT_LEADER_FOR_PARTITION)
+ prepareForTxnMessageAppend(Errors.NOT_LEADER_OR_FOLLOWER)
transactionManager.appendTransactionToLog(transactionalId1, coordinatorEpoch = 10, failedMetadata, assertCallback)
assertEquals(Right(Some(CoordinatorEpochAndTxnMetadata(coordinatorEpoch, txnMetadata1))), transactionManager.getTransactionState(transactionalId1))
assertTrue(txnMetadata1.pendingState.isEmpty)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index f0c4329..6561a48 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -209,7 +209,7 @@ class FetchRequestTest extends BaseRequestTest {
Seq(topicPartition))).build()
val fetchResponse = sendFetchRequest(nonReplicaId, fetchRequest)
val partitionData = fetchResponse.responseData.get(topicPartition)
- assertEquals(Errors.REPLICA_NOT_AVAILABLE, partitionData.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, partitionData.error)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 9da7319..55d2b77 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -770,8 +770,8 @@ class KafkaApisTest {
}
@Test
- def testLeaderReplicaIfLocalRaisesNotLeaderForPartition(): Unit = {
- testListOffsetFailedGetLeaderReplica(Errors.NOT_LEADER_FOR_PARTITION)
+ def testLeaderReplicaIfLocalRaisesNotLeaderOrFollower(): Unit = {
+ testListOffsetFailedGetLeaderReplica(Errors.NOT_LEADER_OR_FOLLOWER)
}
@Test
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 920176e..2600cce 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -64,14 +64,14 @@ class ListOffsetsRequestTest extends BaseRequestTest {
val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
// Follower
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, consumerRequest)
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, replicaRequest)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, consumerRequest)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, replicaRequest)
assertResponseError(Errors.NONE, follower, debugReplicaRequest)
// Non-replica
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, consumerRequest)
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, replicaRequest)
- assertResponseError(Errors.REPLICA_NOT_AVAILABLE, nonReplica, debugReplicaRequest)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, consumerRequest)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, replicaRequest)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, debugReplicaRequest)
}
@Test
@@ -105,8 +105,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
// Check follower error codes
val followerId = TestUtils.findFollowerId(topicPartition, servers)
- assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
- assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}
diff --git a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
index 8ac65b4..9a985af 100644
--- a/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogDirFailureTest.scala
@@ -27,7 +27,7 @@ import kafka.utils.{CoreUtils, Exit, TestUtils}
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderForPartitionException}
+import org.apache.kafka.common.errors.{KafkaStorageException, NotLeaderOrFollowerException}
import org.apache.kafka.common.utils.Utils
import org.junit.Assert.{assertEquals, assertFalse, assertTrue}
import org.junit.{Before, Test}
@@ -150,16 +150,16 @@ class LogDirFailureTest extends IntegrationTestHarness {
causeLogDirFailure(failureType, leaderServer, partition)
- // send() should fail due to either KafkaStorageException or NotLeaderForPartitionException
+ // send() should fail due to either KafkaStorageException or NotLeaderOrFollowerException
try {
producer.send(record).get(6000, TimeUnit.MILLISECONDS)
- fail("send() should fail with either KafkaStorageException or NotLeaderForPartitionException")
+ fail("send() should fail with either KafkaStorageException or NotLeaderOrFollowerException")
} catch {
case e: ExecutionException =>
e.getCause match {
case t: KafkaStorageException =>
- case t: NotLeaderForPartitionException => // This may happen if ProduceRequest version <= 3
- case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderForPartitionException instead of ${t.toString}")
+ case t: NotLeaderOrFollowerException => // This may happen if ProduceRequest version <= 3
+ case t: Throwable => fail(s"send() should fail with either KafkaStorageException or NotLeaderOrFollowerException instead of ${t.toString}")
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
index 8fc9cdb..7c008fc 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetsForLeaderEpochRequestTest.scala
@@ -48,8 +48,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
val follower = replicas.find(_ != leader).get
val nonReplica = servers.map(_.config.brokerId).find(!replicas.contains(_)).get
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, follower, request)
- assertResponseError(Errors.NOT_LEADER_FOR_PARTITION, nonReplica, request)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, follower, request)
+ assertResponseError(Errors.NOT_LEADER_OR_FOLLOWER, nonReplica, request)
}
@Test
@@ -81,8 +81,8 @@ class OffsetsForLeaderEpochRequestTest extends BaseRequestTest {
// Check follower error codes
val followerId = TestUtils.findFollowerId(topicPartition, servers)
- assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.empty())
- assertResponseErrorForEpoch(Errors.NOT_LEADER_FOR_PARTITION, followerId, Optional.of(secondLeaderEpoch))
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.empty())
+ assertResponseErrorForEpoch(Errors.NOT_LEADER_OR_FOLLOWER, followerId, Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch + 1))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, Optional.of(secondLeaderEpoch - 1))
}
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index d3a871a..4368442 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -125,7 +125,7 @@ class ProduceRequestTest extends BaseRequestTest {
val produceResponse = sendProduceRequest(nonReplicaId, produceRequest)
assertEquals(1, produceResponse.responses.size)
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResponse.responses.asScala.head._2.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResponse.responses.asScala.head._2.error)
}
/* returns a pair of partition id and leader id */
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index c2f9033..4713138 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -92,7 +92,7 @@ class ReplicaAlterLogDirsThreadTest {
when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
- when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+ when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
when(quotaManager.isQuotaExceeded).thenReturn(false)
@@ -181,7 +181,7 @@ class ReplicaAlterLogDirsThreadTest {
when(replicaManager.futureLocalLogOrException(t1p0)).thenReturn(futureLog)
when(replicaManager.futureLogExists(t1p0)).thenReturn(true)
when(replicaManager.nonOfflinePartition(t1p0)).thenReturn(Some(partition))
- when(replicaManager.getPartitionOrException(t1p0, expectLeader = false)).thenReturn(partition)
+ when(replicaManager.getPartitionOrException(t1p0)).thenReturn(partition)
when(quotaManager.isQuotaExceeded).thenReturn(false)
@@ -266,13 +266,13 @@ class ReplicaAlterLogDirsThreadTest {
val leoT1p1 = 232
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partitionT1p0)
expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p0, fetchOnlyFromLeader = false))
.andReturn(new EpochEndOffset(leaderEpochT1p0, leoT1p0))
.anyTimes()
- expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p1))
.andStubReturn(partitionT1p1)
expect(partitionT1p1.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpochT1p1, fetchOnlyFromLeader = false))
.andReturn(new EpochEndOffset(leaderEpochT1p1, leoT1p1))
@@ -315,13 +315,13 @@ class ReplicaAlterLogDirsThreadTest {
val leo = 13
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partitionT1p0)
expect(partitionT1p0.lastOffsetForLeaderEpoch(Optional.empty(), leaderEpoch, fetchOnlyFromLeader = false))
.andReturn(new EpochEndOffset(leaderEpoch, leo))
.anyTimes()
- expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p1))
.andThrow(new KafkaStorageException).once()
replay(partitionT1p0, replicaManager)
@@ -375,9 +375,9 @@ class ReplicaAlterLogDirsThreadTest {
val replicaT1p1LEO = 192
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partitionT1p0)
- expect(replicaManager.getPartitionOrException(t1p1, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p1))
.andStubReturn(partitionT1p1)
expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLogT1p0)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
@@ -452,7 +452,7 @@ class ReplicaAlterLogDirsThreadTest {
val futureReplicaEpochEndOffset = 191
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partition)
expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
expect(replicaManager.futureLogExists(t1p0)).andStubReturn(true)
@@ -522,7 +522,7 @@ class ReplicaAlterLogDirsThreadTest {
val initialFetchOffset = 100
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partition)
expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).anyTimes()
expect(replicaManager.futureLocalLogOrException(t1p0)).andStubReturn(futureLog)
@@ -577,7 +577,7 @@ class ReplicaAlterLogDirsThreadTest {
val replicaLEO = 300
//Stubs
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partition)
expect(partition.truncateTo(capture(truncated), isFuture = EasyMock.eq(true))).once()
@@ -655,7 +655,7 @@ class ReplicaAlterLogDirsThreadTest {
val futureReplicaLEO = 190
val replicaLEO = 213
- expect(replicaManager.getPartitionOrException(t1p0, expectLeader = false))
+ expect(replicaManager.getPartitionOrException(t1p0))
.andStubReturn(partition)
expect(partition.lastOffsetForLeaderEpoch(Optional.of(1), leaderEpoch, fetchOnlyFromLeader = false))
.andReturn(new EpochEndOffset(leaderEpoch, replicaLEO))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index 1ab2054..cb6b68f 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -600,7 +600,7 @@ class ReplicaFetcherThreadTest {
//Define the offsets for the OffsetsForLeaderEpochResponse, these are used for truncation
val offsetsReply = mutable.Map(
- t1p0 -> new EpochEndOffset(NOT_LEADER_FOR_PARTITION, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
+ t1p0 -> new EpochEndOffset(NOT_LEADER_OR_FOLLOWER, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET),
t1p1 -> new EpochEndOffset(UNKNOWN_SERVER_ERROR, EpochEndOffset.UNDEFINED_EPOCH, EpochEndOffset.UNDEFINED_EPOCH_OFFSET)
).asJava
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 771bc7d..fe4b2ca 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -165,8 +165,7 @@ class ReplicaManagerQuotasTest {
.andReturn(offsetSnapshot)
val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
- EasyMock.expect(replicaManager.getPartitionOrException(
- EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
+ EasyMock.expect(replicaManager.getPartitionOrException(EasyMock.anyObject[TopicPartition]))
.andReturn(partition).anyTimes()
EasyMock.expect(replicaManager.shouldLeaderThrottle(EasyMock.anyObject[ReplicaQuota], EasyMock.anyObject[Partition], EasyMock.anyObject[Int]))
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 09e4f14..43f3a9a 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -23,10 +23,9 @@ import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference}
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.{Optional, Properties}
-import kafka.api.LeaderAndIsr
-import kafka.api.Request
+import kafka.api._
import kafka.log.{AppendOrigin, Log, LogConfig, LogManager, ProducerStateManager}
-import kafka.cluster.BrokerEndPoint
+import kafka.cluster.{BrokerEndPoint, Partition}
import kafka.server.QuotaFactory.UnboundedQuota
import kafka.server.checkpoints.LazyOffsetCheckpoints
import kafka.server.epoch.util.ReplicaFetcherMockBlockingSend
@@ -184,12 +183,12 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
- rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
val records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("first message".getBytes()))
val appendResult = appendRecords(rm, new TopicPartition(topic, 0), records).onFire { response =>
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, response.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, response.error)
}
// Make this replica the follower
@@ -243,7 +242,7 @@ class ReplicaManagerTest {
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest(0), (_, _) => ())
- val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ val partition = replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
assertEquals(1, replicaManager.logManager.liveLogDirs.filterNot(_ == partition.log.get.dir.getParentFile).size)
val previousReplicaFolder = partition.log.get.dir.getParentFile
@@ -301,7 +300,7 @@ class ReplicaManagerTest {
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
- replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
val producerId = 234L
@@ -361,7 +360,7 @@ class ReplicaManagerTest {
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
- replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
val producerId = 234L
@@ -467,7 +466,7 @@ class ReplicaManagerTest {
.setIsNew(true)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1)).asJava).build()
replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
- replicaManager.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ replicaManager.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
val producerId = 234L
@@ -543,7 +542,7 @@ class ReplicaManagerTest {
.setIsNew(false)).asJava,
Set(new Node(0, "host1", 0), new Node(1, "host2", 1), new Node(2, "host2", 2)).asJava).build()
rm.becomeLeaderOrFollower(0, leaderAndIsrRequest1, (_, _) => ())
- rm.getPartitionOrException(new TopicPartition(topic, 0), expectLeader = true)
+ rm.getPartitionOrException(new TopicPartition(topic, 0))
.localLogOrException
// Append a couple of messages.
@@ -1082,7 +1081,7 @@ class ReplicaManagerTest {
Optional.of(0))
fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None)
assertNotNull(fetchResult.get)
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
}
@Test
@@ -1175,7 +1174,7 @@ class ReplicaManagerTest {
replicaManager.becomeLeaderOrFollower(0, becomeFollowerRequest, (_, _) => ())
assertNotNull(fetchResult.get)
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
}
@Test
@@ -1304,7 +1303,7 @@ class ReplicaManagerTest {
replicaManager.stopReplica(tp0, deletePartition = true)
assertNotNull(fetchResult.get)
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, fetchResult.get.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, fetchResult.get.error)
}
@Test
@@ -1338,7 +1337,7 @@ class ReplicaManagerTest {
replicaManager.stopReplica(tp0, deletePartition = true)
assertNotNull(produceResult.get)
- assertEquals(Errors.NOT_LEADER_FOR_PARTITION, produceResult.get.error)
+ assertEquals(Errors.NOT_LEADER_OR_FOLLOWER, produceResult.get.error)
}
private def sendProducerAppend(replicaManager: ReplicaManager,
@@ -2086,4 +2085,31 @@ class ReplicaManagerTest {
assertEquals(HostedPartition.None, replicaManager.getPartition(tp0))
}
}
+
+ @Test
+ def testReplicaNotAvailable(): Unit = {
+
+ def createReplicaManager(): ReplicaManager = {
+ val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+ val config = KafkaConfig.fromProps(props)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
+ new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
+ new AtomicBoolean(false), QuotaFactory.instantiate(config, metrics, time, ""), new BrokerTopicStats,
+ new MetadataCache(config.brokerId), new LogDirFailureChannel(config.logDirs.size)) {
+ override def getPartitionOrException(topicPartition: TopicPartition): Partition = {
+ throw Errors.NOT_LEADER_OR_FOLLOWER.exception()
+ }
+ }
+ }
+
+ val replicaManager = createReplicaManager()
+ try {
+ val tp = new TopicPartition(topic, 0)
+ val dir = replicaManager.logManager.liveLogDirs.head.getAbsolutePath
+ val errors = replicaManager.alterReplicaLogDirs(Map(tp -> dir))
+ assertEquals(Errors.REPLICA_NOT_AVAILABLE, errors(tp))
+ } finally {
+ replicaManager.shutdown(false)
+ }
+ }
}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
index 8b35149..e349f32 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochIntegrationTest.scala
@@ -128,7 +128,7 @@ class LeaderEpochIntegrationTest extends ZooKeeperTestHarness with Logging {
//And should get no leader for partition error from t1p1 (as it's not on broker 0)
assertTrue(offsetsForEpochs(t1p1).hasError)
- assertEquals(NOT_LEADER_FOR_PARTITION, offsetsForEpochs(t1p1).error)
+ assertEquals(NOT_LEADER_OR_FOLLOWER, offsetsForEpochs(t1p1).error)
assertEquals(UNDEFINED_EPOCH_OFFSET, offsetsForEpochs(t1p1).endOffset)
//Repointing to broker 1 we should get the correct offset for t1p1
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 08be8a2..fd4e8d9 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -87,7 +87,7 @@ class OffsetsForLeaderEpochTest {
val response = replicaManager.lastOffsetForLeaderEpoch(request)
//Then
- assertEquals(new EpochEndOffset(Errors.NOT_LEADER_FOR_PARTITION, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
+ assertEquals(new EpochEndOffset(Errors.NOT_LEADER_OR_FOLLOWER, UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
}
@Test
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 2983cf7..676bc69 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -35,6 +35,10 @@
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-602%3A+Change+default+value+for+client.dns.lookup">KIP-602</a>
for more details.
</li>
+ <li><code>NotLeaderForPartitionException</code> has been deprecated and replaced with <code>NotLeaderOrFollowerException</code>.
+ Fetch requests and other requests intended only for the leader or follower return NOT_LEADER_OR_FOLLOWER(6) instead of REPLICA_NOT_AVAILABLE(9)
+ if the broker is not a replica, ensuring that this transient error during reassignments is handled by all clients as a retriable exception.
+ </li>
</ul>
<h5><a id="upgrade_250_notable" href="#upgrade_250_notable">Notable changes in 2.5.0</a></h5>