You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2021/09/15 15:54:39 UTC
[kafka] branch trunk updated: KAFKA-13162: Ensure ElectLeaders is
properly handled in KRaft (#11186)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7de8a93 KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)
7de8a93 is described below
commit 7de8a93c7e1036553ab6e738cf91bc39e154719e
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed Sep 15 08:52:45 2021 -0700
KAFKA-13162: Ensure ElectLeaders is properly handled in KRaft (#11186)
This patch fixes several problems with the `ElectLeaders` API in KRaft:
- `KafkaApis` did not properly forward this request type to the controller.
- `ControllerApis` did not handle the request type.
- `ElectLeadersRequest.getErrorResponse` may raise NPE when `TopicPartitions` is null.
- Controller should not do preferred election if `ElectLeaders` specifies `UNCLEAN` election.
- Controller should not do unclean election if `ElectLeaders` specifies `PREFERRED` election.
- Controller should use proper error codes to handle cases when desired leader is unavailable or when no election is needed because a desired leader is already elected.
- When election for all partitions is requested (indicated with null `TopicPartitions` field), the response should not return partitions for which no election was necessary.
In addition to extending the unit test coverage in `ReplicationControlManagerTest`, I have also converted `LeaderElectionCommandTest` to use KRaft.
Reviewers: dengziming <sw...@163.com>, José Armando García Sancio <js...@users.noreply.github.com>, David Arthur <mu...@gmail.com>
---
.../org/apache/kafka/common/protocol/ApiKeys.java | 2 +-
.../kafka/common/requests/ElectLeadersRequest.java | 24 +-
.../kafka/server/BrokerLifecycleManager.scala | 8 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 17 +-
.../main/scala/kafka/server/ControllerApis.scala | 24 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 9 +-
.../server/metadata/BrokerMetadataPublisher.scala | 2 +
core/src/test/java/kafka/test/ClusterInstance.java | 6 +
.../test/junit/RaftClusterInvocationContext.java | 63 ++-
.../test/junit/ZkClusterInvocationContext.java | 43 +-
.../kafka/server/IntegrationTestUtils.scala | 38 +-
.../admin/LeaderElectionCommandErrorTest.scala | 97 ++++
.../kafka/admin/LeaderElectionCommandTest.scala | 371 ++++++--------
.../unit/kafka/server/ControllerApisTest.scala | 105 +++-
.../scala/unit/kafka/server/KafkaApisTest.scala | 92 ++--
.../test/scala/unit/kafka/utils/TestUtils.scala | 77 ++-
.../scala/unit/kafka/zk/ZooKeeperTestHarness.scala | 37 +-
.../unit/kafka/zookeeper/ZooKeeperClientTest.scala | 7 +-
.../kafka/controller/ClusterControlManager.java | 6 +-
.../kafka/controller/PartitionChangeBuilder.java | 23 +
.../controller/ReplicationControlManager.java | 59 ++-
.../org/apache/kafka/image/ClientQuotasDelta.java | 7 +
.../java/org/apache/kafka/image/ClusterDelta.java | 7 +
.../org/apache/kafka/image/ConfigurationDelta.java | 9 +
.../apache/kafka/image/ConfigurationsDelta.java | 7 +
.../java/org/apache/kafka/image/FeaturesDelta.java | 7 +
.../java/org/apache/kafka/image/MetadataDelta.java | 11 +
.../java/org/apache/kafka/image/TopicDelta.java | 7 +
.../java/org/apache/kafka/image/TopicsDelta.java | 8 +
.../controller/PartitionChangeBuilderTest.java | 8 +
.../controller/ReplicationControlManagerTest.java | 560 ++++++++++++++++-----
31 files changed, 1210 insertions(+), 531 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 423093a..428e4a8 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -84,7 +84,7 @@ public enum ApiKeys {
EXPIRE_DELEGATION_TOKEN(ApiMessageType.EXPIRE_DELEGATION_TOKEN, false, true),
DESCRIBE_DELEGATION_TOKEN(ApiMessageType.DESCRIBE_DELEGATION_TOKEN),
DELETE_GROUPS(ApiMessageType.DELETE_GROUPS),
- ELECT_LEADERS(ApiMessageType.ELECT_LEADERS),
+ ELECT_LEADERS(ApiMessageType.ELECT_LEADERS, false, true),
INCREMENTAL_ALTER_CONFIGS(ApiMessageType.INCREMENTAL_ALTER_CONFIGS, false, true),
ALTER_PARTITION_REASSIGNMENTS(ApiMessageType.ALTER_PARTITION_REASSIGNMENTS, false, true),
LIST_PARTITION_REASSIGNMENTS(ApiMessageType.LIST_PARTITION_REASSIGNMENTS, false, true),
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
index 1bc888a..febb030 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ElectLeadersRequest.java
@@ -103,20 +103,22 @@ public class ElectLeadersRequest extends AbstractRequest {
ApiError apiError = ApiError.fromThrowable(e);
List<ReplicaElectionResult> electionResults = new ArrayList<>();
- for (TopicPartitions topic : data.topicPartitions()) {
- ReplicaElectionResult electionResult = new ReplicaElectionResult();
+ if (data.topicPartitions() != null) {
+ for (TopicPartitions topic : data.topicPartitions()) {
+ ReplicaElectionResult electionResult = new ReplicaElectionResult();
- electionResult.setTopic(topic.topic());
- for (Integer partitionId : topic.partitions()) {
- PartitionResult partitionResult = new PartitionResult();
- partitionResult.setPartitionId(partitionId);
- partitionResult.setErrorCode(apiError.error().code());
- partitionResult.setErrorMessage(apiError.message());
+ electionResult.setTopic(topic.topic());
+ for (Integer partitionId : topic.partitions()) {
+ PartitionResult partitionResult = new PartitionResult();
+ partitionResult.setPartitionId(partitionId);
+ partitionResult.setErrorCode(apiError.error().code());
+ partitionResult.setErrorMessage(apiError.message());
- electionResult.partitionResult().add(partitionResult);
- }
+ electionResult.partitionResult().add(partitionResult);
+ }
- electionResults.add(electionResult);
+ electionResults.add(electionResult);
+ }
}
return new ElectLeadersResponse(throttleTimeMs, apiError.error().code(), electionResults, version());
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index e15a3e6..394c353 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -208,7 +208,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
_state = BrokerState.PENDING_CONTROLLED_SHUTDOWN
// Send the next heartbeat immediately in order to let the controller
// begin processing the controlled shutdown as soon as possible.
- scheduleNextCommunication(0)
+ scheduleNextCommunicationImmediately()
case _ =>
info(s"Skipping controlled shutdown because we are in state ${_state}.")
@@ -284,8 +284,8 @@ class BrokerLifecycleManager(val config: KafkaConfig,
setIncarnationId(incarnationId).
setListeners(_advertisedListeners).
setRack(rack.orNull)
- if (isTraceEnabled) {
- trace(s"Sending broker registration ${data}")
+ if (isDebugEnabled) {
+ debug(s"Sending broker registration ${data}")
}
_channelManager.sendRequest(new BrokerRegistrationRequest.Builder(data),
new BrokerRegistrationResponseHandler())
@@ -406,7 +406,7 @@ class BrokerLifecycleManager(val config: KafkaConfig,
scheduleNextCommunicationAfterSuccess()
}
} else {
- info(s"The controlled has asked us to exit controlled shutdown.")
+ info(s"The controller has asked us to exit controlled shutdown.")
beginShutdown()
}
gotControlledShutdownResponse = true
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index d2079c4..533036f 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,11 +17,11 @@
package kafka.server
+import java.net.InetAddress
import java.util
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
-import java.net.InetAddress
import kafka.cluster.Broker.ServerInfo
import kafka.coordinator.group.GroupCoordinator
@@ -34,7 +34,6 @@ import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
-import org.apache.kafka.snapshot.SnapshotWriter
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
@@ -45,14 +44,15 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.raft.RaftConfig.AddressSpec
+import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
import scala.collection.{Map, Seq}
-import scala.jdk.CollectionConverters._
import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
@@ -92,8 +92,7 @@ class BrokerServer(
this.logIdent = logContext.logPrefix
- val lifecycleManager: BrokerLifecycleManager =
- new BrokerLifecycleManager(config, time, threadNamePrefix)
+ @volatile private var lifecycleManager: BrokerLifecycleManager = null
private val isShuttingDown = new AtomicBoolean(false)
@@ -105,7 +104,7 @@ class BrokerServer(
var controlPlaneRequestProcessor: KafkaApis = null
var authorizer: Option[Authorizer] = None
- var socketServer: SocketServer = null
+ @volatile var socketServer: SocketServer = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var logDirFailureChannel: LogDirFailureChannel = null
@@ -162,6 +161,8 @@ class BrokerServer(
lock.lock()
try {
if (status != from) return false
+ info(s"Transition from $status to $to")
+
status = to
if (to == SHUTTING_DOWN) {
isShuttingDown.set(true)
@@ -182,6 +183,8 @@ class BrokerServer(
try {
info("Starting broker")
+ lifecycleManager = new BrokerLifecycleManager(config, time, threadNamePrefix)
+
/* start scheduler */
kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
kafkaScheduler.startup()
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
index 2f86e1f..ed9b55a 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -20,8 +20,8 @@ package kafka.server
import java.util
import java.util.Collections
import java.util.Map.Entry
-import java.util.concurrent.{CompletableFuture, ExecutionException}
import java.util.concurrent.TimeUnit.{MILLISECONDS, NANOSECONDS}
+import java.util.concurrent.{CompletableFuture, ExecutionException}
import kafka.network.RequestChannel
import kafka.raft.RaftManager
@@ -36,11 +36,10 @@ import org.apache.kafka.common.internals.FatalExitError
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.CreateTopicsRequestData
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultCollection}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
-import org.apache.kafka.common.message._
+import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
@@ -108,6 +107,7 @@ class ControllerApis(val requestChannel: RequestChannel,
case ApiKeys.DESCRIBE_ACLS => aclApis.handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => aclApis.handleCreateAcls(request)
case ApiKeys.DELETE_ACLS => aclApis.handleDeleteAcls(request)
+ case ApiKeys.ELECT_LEADERS => handleElectLeaders(request)
case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}")
}
} catch {
@@ -488,6 +488,24 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
}
+ def handleElectLeaders(request: RequestChannel.Request): Unit = {
+ authHelper.authorizeClusterOperation(request, ALTER)
+
+ val electLeadersRequest = request.body[ElectLeadersRequest]
+ val future = controller.electLeaders(electLeadersRequest.data)
+ future.whenComplete { (responseData, exception) =>
+ if (exception != null) {
+ requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
+ electLeadersRequest.getErrorResponse(throttleMs, exception)
+ })
+ } else {
+ requestHelper.sendResponseMaybeThrottle(request, throttleMs => {
+ new ElectLeadersResponse(responseData.setThrottleTimeMs(throttleMs))
+ })
+ }
+ }
+ }
+
def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
val alterIsrRequest = request.body[AlterIsrRequest]
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c2e0c15..e20a36a 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -209,7 +209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
- case ApiKeys.ELECT_LEADERS => handleElectReplicaLeader(request)
+ case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
@@ -2993,9 +2993,8 @@ class KafkaApis(val requestChannel: RequestChannel,
true
}
- def handleElectReplicaLeader(request: RequestChannel.Request): Unit = {
- val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.notYetSupported(request))
-
+ def handleElectLeaders(request: RequestChannel.Request): Unit = {
+ val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val electionRequest = request.body[ElectLeadersRequest]
def sendResponseCallback(
@@ -3006,7 +3005,7 @@ class KafkaApis(val requestChannel: RequestChannel,
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
val adjustedResults = if (electionRequest.data.topicPartitions == null) {
/* When performing elections across all of the partitions we should only return
- * partitions for which there was an eleciton or resulted in an error. In other
+ * partitions for which there was an election or resulted in an error. In other
* words, partitions that didn't need election because they ready have the correct
* leader are not returned to the client.
*/
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 4fe4938..cedef47 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -117,6 +117,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
delta: MetadataDelta,
newImage: MetadataImage): Unit = {
try {
+ trace(s"Publishing delta $delta with highest offset $newHighestMetadataOffset")
+
// Publish the new metadata image to the metadata cache.
metadataCache.setImage(newImage)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java
index 021db5a..23b417e 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -95,5 +95,11 @@ public interface ClusterInstance {
void stop();
+ void shutdownBroker(int brokerId);
+
+ void startBroker(int brokerId);
+
void rollingBrokerRestart();
+
+ void waitForReadyBrokers() throws InterruptedException;
}
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index c60e0ec..599bdf0 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -27,6 +27,7 @@ import kafka.testkit.TestKitNodes;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.BrokerState;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
@@ -36,10 +37,14 @@ import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Optional;
import java.util.Properties;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
@@ -73,6 +78,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public List<Extension> getAdditionalExtensions() {
+ RaftClusterInstance clusterInstance = new RaftClusterInstance(clusterReference, clusterConfig);
return Arrays.asList(
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
@@ -97,8 +103,8 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
100L);
},
- (AfterTestExecutionCallback) context -> clusterReference.get().close(),
- new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)),
+ (AfterTestExecutionCallback) context -> clusterInstance.stop(),
+ new ClusterInstanceParameterResolver(clusterInstance),
new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
);
}
@@ -109,6 +115,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
final AtomicBoolean started = new AtomicBoolean(false);
final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final ConcurrentLinkedQueue<Admin> admins = new ConcurrentLinkedQueue<>();
RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
this.clusterReference = clusterReference;
@@ -122,7 +129,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Collection<SocketServer> brokerSocketServers() {
- return clusterReference.get().brokers().values().stream()
+ return brokers()
.map(BrokerServer::socketServer)
.collect(Collectors.toList());
}
@@ -134,14 +141,14 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Collection<SocketServer> controllerSocketServers() {
- return clusterReference.get().controllers().values().stream()
+ return controllers()
.map(ControllerServer::socketServer)
.collect(Collectors.toList());
}
@Override
public SocketServer anyBrokerSocketServer() {
- return clusterReference.get().brokers().values().stream()
+ return brokers()
.map(BrokerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@@ -149,7 +156,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public SocketServer anyControllerSocketServer() {
- return clusterReference.get().controllers().values().stream()
+ return controllers()
.map(ControllerServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
@@ -172,7 +179,9 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public Admin createAdminClient(Properties configOverrides) {
- return Admin.create(clusterReference.get().clientProperties());
+ Admin admin = Admin.create(clusterReference.get().clientProperties());
+ admins.add(admin);
+ return admin;
}
@Override
@@ -189,11 +198,27 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
@Override
public void stop() {
if (stopped.compareAndSet(false, true)) {
- try {
- clusterReference.get().close();
- } catch (Exception e) {
- throw new RuntimeException("Failed to stop Raft server", e);
- }
+ admins.forEach(admin -> Utils.closeQuietly(admin, "admin"));
+ Utils.closeQuietly(clusterReference.get(), "cluster");
+ }
+ }
+
+ @Override
+ public void shutdownBroker(int brokerId) {
+ findBrokerOrThrow(brokerId).shutdown();
+ }
+
+ @Override
+ public void startBroker(int brokerId) {
+ findBrokerOrThrow(brokerId).startup();
+ }
+
+ @Override
+ public void waitForReadyBrokers() throws InterruptedException {
+ try {
+ clusterReference.get().waitForReadyBrokers();
+ } catch (ExecutionException e) {
+ throw new AssertionError("Failed while waiting for brokers to become ready", e);
}
}
@@ -201,5 +226,19 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
public void rollingBrokerRestart() {
throw new UnsupportedOperationException("Restarting Raft servers is not yet supported.");
}
+
+ private BrokerServer findBrokerOrThrow(int brokerId) {
+ return Optional.ofNullable(clusterReference.get().brokers().get(brokerId))
+ .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
+ }
+
+ private Stream<BrokerServer> brokers() {
+ return clusterReference.get().brokers().values().stream();
+ }
+
+ private Stream<ControllerServer> controllers() {
+ return clusterReference.get().controllers().values().stream();
+ }
+
}
}
diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index cd8cdc1..4d4323f 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -21,12 +21,12 @@ import kafka.api.IntegrationTestHarness;
import kafka.network.SocketServer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
-import kafka.test.ClusterConfig;
-import kafka.test.ClusterInstance;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@@ -44,6 +44,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
/**
* Wraps a {@link IntegrationTestHarness} inside lifecycle methods for a test invocation. Each instance of this
@@ -193,7 +194,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Collection<SocketServer> brokerSocketServers() {
- return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ return servers()
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
}
@@ -205,7 +206,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public Collection<SocketServer> controllerSocketServers() {
- return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.collect(Collectors.toList());
@@ -213,7 +214,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public SocketServer anyBrokerSocketServer() {
- return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ return servers()
.map(KafkaServer::socketServer)
.findFirst()
.orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
@@ -221,7 +222,7 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
@Override
public SocketServer anyControllerSocketServer() {
- return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream()
+ return servers()
.filter(broker -> broker.kafkaController().isActive())
.map(KafkaServer::socketServer)
.findFirst()
@@ -263,6 +264,16 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
}
@Override
+ public void shutdownBroker(int brokerId) {
+ findBrokerOrThrow(brokerId).shutdown();
+ }
+
+ @Override
+ public void startBroker(int brokerId) {
+ findBrokerOrThrow(brokerId).startup();
+ }
+
+ @Override
public void rollingBrokerRestart() {
if (!started.get()) {
throw new IllegalStateException("Tried to restart brokers but the cluster has not been started!");
@@ -272,5 +283,25 @@ public class ZkClusterInvocationContext implements TestTemplateInvocationContext
}
clusterReference.get().restartDeadBrokers(true);
}
+
+ @Override
+ public void waitForReadyBrokers() throws InterruptedException {
+ org.apache.kafka.test.TestUtils.waitForCondition(() -> {
+ int numRegisteredBrokers = clusterReference.get().zkClient().getAllBrokersInCluster().size();
+ return numRegisteredBrokers == config.numBrokers();
+ }, "Timed out while waiting for brokers to become ready");
+ }
+
+ private KafkaServer findBrokerOrThrow(int brokerId) {
+ return servers()
+ .filter(server -> server.config().brokerId() == brokerId)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Unknown brokerId " + brokerId));
+ }
+
+ private Stream<KafkaServer> servers() {
+ return JavaConverters.asJavaCollection(clusterReference.get().servers()).stream();
+ }
+
}
}
diff --git a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 9a21e7f..203e181 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -17,19 +17,23 @@
package kafka.server
+import java.io.{DataInputStream, DataOutputStream}
+import java.net.Socket
+import java.nio.ByteBuffer
+import java.util.{Collections, Properties}
+
import kafka.network.SocketServer
+import kafka.utils.Implicits._
import kafka.utils.{NotNothing, TestUtils}
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
import org.apache.kafka.common.network.{ListenerName, Mode}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, RequestHeader, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
-import java.io.{DataInputStream, DataOutputStream}
-import java.net.Socket
-import java.nio.ByteBuffer
-import java.util.Properties
import scala.annotation.nowarn
+import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag
object IntegrationTestUtils {
@@ -101,6 +105,32 @@ object IntegrationTestUtils {
finally socket.close()
}
+ def createTopic(
+ admin: Admin,
+ topic: String,
+ numPartitions: Int,
+ replicationFactor: Short
+ ): Unit = {
+ val newTopics = Collections.singletonList(new NewTopic(topic, numPartitions, replicationFactor))
+ val createTopicResult = admin.createTopics(newTopics)
+ createTopicResult.all().get()
+ }
+
+ def createTopic(
+ admin: Admin,
+ topic: String,
+ replicaAssignment: Map[Int, Seq[Int]]
+ ): Unit = {
+ val javaAssignment = new java.util.HashMap[Integer, java.util.List[Integer]]()
+ replicaAssignment.forKeyValue { (partitionId, assignment) =>
+ javaAssignment.put(partitionId, assignment.map(Int.box).asJava)
+ }
+ val newTopic = new NewTopic(topic, javaAssignment)
+ val newTopics = Collections.singletonList(newTopic)
+ val createTopicResult = admin.createTopics(newTopics)
+ createTopicResult.all().get()
+ }
+
protected def securityProtocol: SecurityProtocol = SecurityProtocol.PLAINTEXT
private var correlationId = 0
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
new file mode 100644
index 0000000..eaef936
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandErrorTest.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin
+
+import kafka.common.AdminCommandFailedException
+import org.apache.kafka.common.errors.TimeoutException
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+
+import scala.concurrent.duration._
+
+/**
+ * For some error cases, we can save a little build time by avoiding the overhead for
+ * cluster creation and cleanup because the command is expected to fail immediately.
+ */
+class LeaderElectionCommandErrorTest {
+
+ @Test
+ def testTopicWithoutPartition(): Unit = {
+ val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", "nohost:9092",
+ "--election-type", "unclean",
+ "--topic", "some-topic"
+ )
+ ))
+ assertTrue(e.getMessage.startsWith("Missing required option(s)"))
+ assertTrue(e.getMessage.contains(" partition"))
+ }
+
+ @Test
+ def testPartitionWithoutTopic(): Unit = {
+ val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", "nohost:9092",
+ "--election-type", "unclean",
+ "--all-topic-partitions",
+ "--partition", "0"
+ )
+ ))
+ assertEquals("Option partition is only allowed if topic is used", e.getMessage)
+ }
+
+ @Test
+ def testMissingElectionType(): Unit = {
+ val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", "nohost:9092",
+ "--topic", "some-topic",
+ "--partition", "0"
+ )
+ ))
+ assertTrue(e.getMessage.startsWith("Missing required option(s)"))
+ assertTrue(e.getMessage.contains(" election-type"))
+ }
+
+ @Test
+ def testMissingTopicPartitionSelection(): Unit = {
+ val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", "nohost:9092",
+ "--election-type", "preferred"
+ )
+ ))
+ assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
+ assertTrue(e.getMessage.contains(" all-topic-partitions"))
+ assertTrue(e.getMessage.contains(" topic"))
+ assertTrue(e.getMessage.contains(" path-to-json-file"))
+ }
+
+ @Test
+ def testInvalidBroker(): Unit = {
+ val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
+ Array(
+ "--bootstrap-server", "example.com:1234",
+ "--election-type", "unclean",
+ "--all-topic-partitions"
+ ),
+ 1.seconds
+ ))
+ assertTrue(e.getCause.isInstanceOf[TimeoutException])
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index a7b91ea..b942f6f 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -18,214 +18,176 @@ package kafka.admin
import java.io.File
import java.nio.charset.StandardCharsets
-import java.nio.file.Files
-import java.nio.file.Path
+import java.nio.file.{Files, Path}
import kafka.common.AdminCommandFailedException
-import kafka.server.KafkaConfig
-import kafka.server.KafkaServer
+import kafka.server.IntegrationTestUtils.createTopic
+import kafka.server.{KafkaConfig, KafkaServer}
+import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, Type}
+import kafka.test.junit.ClusterTestExtensions
+import kafka.test.{ClusterConfig, ClusterInstance}
import kafka.utils.TestUtils
-import kafka.zk.ZooKeeperTestHarness
-import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
+import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.TimeoutException
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException
import org.apache.kafka.common.network.ListenerName
-import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.BeforeEach
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.extension.ExtendWith
+import org.junit.jupiter.api.{BeforeEach, Tag}
-import scala.jdk.CollectionConverters._
-import scala.collection.Seq
-import scala.concurrent.duration._
-
-final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
+@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
+@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@Tag("integration")
+final class LeaderElectionCommandTest(cluster: ClusterInstance) {
import LeaderElectionCommandTest._
- var servers = Seq.empty[KafkaServer]
val broker1 = 0
val broker2 = 1
val broker3 = 2
@BeforeEach
- override def setUp(): Unit = {
- super.setUp()
-
- val brokerConfigs = TestUtils.createBrokerConfigs(3, zkConnect, false)
- servers = brokerConfigs.map { config =>
- config.setProperty("auto.leader.rebalance.enable", "false")
- config.setProperty("controlled.shutdown.enable", "true")
- config.setProperty("controlled.shutdown.max.retries", "1")
- config.setProperty("controlled.shutdown.retry.backoff.ms", "1000")
- TestUtils.createServer(KafkaConfig.fromProps(config))
- }
+ def setup(clusterConfig: ClusterConfig): Unit = {
+ TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
+ clusterConfig.serverProperties().put(KafkaConfig.AutoLeaderRebalanceEnableProp, "false")
+ clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownEnableProp, "true")
+ clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownMaxRetriesProp, "1")
+ clusterConfig.serverProperties().put(KafkaConfig.ControlledShutdownRetryBackoffMsProp, "1000")
+ clusterConfig.serverProperties().put(KafkaConfig.OffsetsTopicReplicationFactorProp, "2")
}
- @AfterEach
- override def tearDown(): Unit = {
- TestUtils.shutdownServers(servers)
-
- super.tearDown()
- }
-
- @Test
+ @ClusterTest
def testAllTopicPartition(): Unit = {
- TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
- val topic = "unclean-topic"
- val partition = 0
- val assignment = Seq(broker2, broker3)
-
- TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+ val client = cluster.createAdminClient()
+ val topic = "unclean-topic"
+ val partition = 0
+ val assignment = Seq(broker2, broker3)
- val topicPartition = new TopicPartition(topic, partition)
+ cluster.waitForReadyBrokers()
+ createTopic(client, topic, Map(partition -> assignment))
- TestUtils.assertLeader(client, topicPartition, broker2)
+ val topicPartition = new TopicPartition(topic, partition)
- servers(broker3).shutdown()
- TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
- servers(broker2).shutdown()
- TestUtils.assertNoLeader(client, topicPartition)
- servers(broker3).startup()
+ TestUtils.assertLeader(client, topicPartition, broker2)
+ cluster.shutdownBroker(broker3)
+ TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+ cluster.shutdownBroker(broker2)
+ TestUtils.assertNoLeader(client, topicPartition)
+ cluster.startBroker(broker3)
+ TestUtils.waitForOnlineBroker(client, broker3)
- LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "unclean",
- "--all-topic-partitions"
- )
+ LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "unclean",
+ "--all-topic-partitions"
)
+ )
- TestUtils.assertLeader(client, topicPartition, broker3)
- }
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
- @Test
+ @ClusterTest
def testTopicPartition(): Unit = {
- TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
- val topic = "unclean-topic"
- val partition = 0
- val assignment = Seq(broker2, broker3)
+ val client = cluster.createAdminClient()
+ val topic = "unclean-topic"
+ val partition = 0
+ val assignment = Seq(broker2, broker3)
- TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+ cluster.waitForReadyBrokers()
+ createTopic(client, topic, Map(partition -> assignment))
- val topicPartition = new TopicPartition(topic, partition)
+ val topicPartition = new TopicPartition(topic, partition)
- TestUtils.assertLeader(client, topicPartition, broker2)
+ TestUtils.assertLeader(client, topicPartition, broker2)
- servers(broker3).shutdown()
- TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
- servers(broker2).shutdown()
- TestUtils.assertNoLeader(client, topicPartition)
- servers(broker3).startup()
+ cluster.shutdownBroker(broker3)
+ TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+ cluster.shutdownBroker(broker2)
+ TestUtils.assertNoLeader(client, topicPartition)
+ cluster.startBroker(broker3)
+ TestUtils.waitForOnlineBroker(client, broker3)
- LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "unclean",
- "--topic", topic,
- "--partition", partition.toString
- )
+ LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "unclean",
+ "--topic", topic,
+ "--partition", partition.toString
)
+ )
- TestUtils.assertLeader(client, topicPartition, broker3)
- }
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
- @Test
+ @ClusterTest
def testPathToJsonFile(): Unit = {
- TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
- val topic = "unclean-topic"
- val partition = 0
- val assignment = Seq(broker2, broker3)
+ val client = cluster.createAdminClient()
+ val topic = "unclean-topic"
+ val partition = 0
+ val assignment = Seq(broker2, broker3)
- TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+ cluster.waitForReadyBrokers()
+ createTopic(client, topic, Map(partition -> assignment))
- val topicPartition = new TopicPartition(topic, partition)
+ val topicPartition = new TopicPartition(topic, partition)
- TestUtils.assertLeader(client, topicPartition, broker2)
+ TestUtils.assertLeader(client, topicPartition, broker2)
- servers(broker3).shutdown()
- TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
- servers(broker2).shutdown()
- TestUtils.assertNoLeader(client, topicPartition)
- servers(broker3).startup()
+ cluster.shutdownBroker(broker3)
+ TestUtils.waitForBrokersOutOfIsr(client, Set(topicPartition), Set(broker3))
+ cluster.shutdownBroker(broker2)
+ TestUtils.assertNoLeader(client, topicPartition)
+ cluster.startBroker(broker3)
+ TestUtils.waitForOnlineBroker(client, broker3)
- val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
+ val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition))
- LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "unclean",
- "--path-to-json-file", topicPartitionPath.toString
- )
+ LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "unclean",
+ "--path-to-json-file", topicPartitionPath.toString
)
+ )
- TestUtils.assertLeader(client, topicPartition, broker3)
- }
+ TestUtils.assertLeader(client, topicPartition, broker3)
}
- @Test
+ @ClusterTest
def testPreferredReplicaElection(): Unit = {
- TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
- val topic = "unclean-topic"
- val partition = 0
- val assignment = Seq(broker2, broker3)
+ val client = cluster.createAdminClient()
+ val topic = "preferred-topic"
+ val partition = 0
+ val assignment = Seq(broker2, broker3)
- TestUtils.createTopic(zkClient, topic, Map(partition -> assignment), servers)
+ cluster.waitForReadyBrokers()
+ createTopic(client, topic, Map(partition -> assignment))
- val topicPartition = new TopicPartition(topic, partition)
+ val topicPartition = new TopicPartition(topic, partition)
- TestUtils.assertLeader(client, topicPartition, broker2)
+ TestUtils.assertLeader(client, topicPartition, broker2)
- servers(broker2).shutdown()
- TestUtils.assertLeader(client, topicPartition, broker3)
- servers(broker2).startup()
- TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
+ cluster.shutdownBroker(broker2)
+ TestUtils.assertLeader(client, topicPartition, broker3)
+ cluster.startBroker(broker2)
+ TestUtils.waitForBrokersInIsr(client, topicPartition, Set(broker2))
- LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "preferred",
- "--all-topic-partitions"
- )
- )
-
- TestUtils.assertLeader(client, topicPartition, broker2)
- }
- }
-
- @Test
- def testTopicWithoutPartition(): Unit = {
- val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
+ LeaderElectionCommand.main(
Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "unclean",
- "--topic", "some-topic"
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "preferred",
+ "--all-topic-partitions"
)
- ))
- assertTrue(e.getMessage.startsWith("Missing required option(s)"))
- assertTrue(e.getMessage.contains(" partition"))
- }
+ )
- @Test
- def testPartitionWithoutTopic(): Unit = {
- val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "unclean",
- "--all-topic-partitions",
- "--partition", "0"
- )
- ))
- assertEquals("Option partition is only allowed if topic is used", e.getMessage)
+ TestUtils.assertLeader(client, topicPartition, broker2)
}
- @Test
+ @ClusterTest
def testTopicDoesNotExist(): Unit = {
val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.main(
Array(
- "--bootstrap-server", bootstrapServers(servers),
+ "--bootstrap-server", cluster.bootstrapServers(),
"--election-type", "preferred",
"--topic", "unknown-topic-name",
"--partition", "0"
@@ -234,86 +196,55 @@ final class LeaderElectionCommandTest extends ZooKeeperTestHarness {
assertTrue(e.getSuppressed()(0).isInstanceOf[UnknownTopicOrPartitionException])
}
- @Test
- def testMissingElectionType(): Unit = {
- val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--topic", "some-topic",
- "--partition", "0"
- )
+ @ClusterTest
+ def testElectionResultOutput(): Unit = {
+ val client = cluster.createAdminClient()
+ val topic = "non-preferred-topic"
+ val partition0 = 0
+ val partition1 = 1
+ val assignment0 = Seq(broker2, broker3)
+ val assignment1 = Seq(broker3, broker2)
+
+ cluster.waitForReadyBrokers()
+ createTopic(client, topic, Map(
+ partition0 -> assignment0,
+ partition1 -> assignment1
))
- assertTrue(e.getMessage.startsWith("Missing required option(s)"))
- assertTrue(e.getMessage.contains(" election-type"))
- }
- @Test
- def testMissingTopicPartitionSelection(): Unit = {
- val e = assertThrows(classOf[Throwable], () => LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "preferred"
- )
- ))
- assertTrue(e.getMessage.startsWith("One and only one of the following options is required: "))
- assertTrue(e.getMessage.contains(" all-topic-partitions"))
- assertTrue(e.getMessage.contains(" topic"))
- assertTrue(e.getMessage.contains(" path-to-json-file"))
- }
+ val topicPartition0 = new TopicPartition(topic, partition0)
+ val topicPartition1 = new TopicPartition(topic, partition1)
- @Test
- def testInvalidBroker(): Unit = {
- val e = assertThrows(classOf[AdminCommandFailedException], () => LeaderElectionCommand.run(
- Array(
- "--bootstrap-server", "example.com:1234",
- "--election-type", "unclean",
- "--all-topic-partitions"
- ),
- 1.seconds
- ))
- assertTrue(e.getCause.isInstanceOf[TimeoutException])
- }
+ TestUtils.assertLeader(client, topicPartition0, broker2)
+ TestUtils.assertLeader(client, topicPartition1, broker3)
- @Test
- def testElectionResultOutput(): Unit = {
- TestUtils.resource(Admin.create(createConfig(servers).asJava)) { client =>
- val topic = "non-preferred-topic"
- val partition0 = 0
- val partition1 = 1
- val assignment0 = Seq(broker2, broker3)
- val assignment1 = Seq(broker3, broker2)
-
- TestUtils.createTopic(zkClient, topic, Map(partition0 -> assignment0, partition1 -> assignment1), servers)
-
- val topicPartition0 = new TopicPartition(topic, partition0)
- val topicPartition1 = new TopicPartition(topic, partition1)
-
- TestUtils.assertLeader(client, topicPartition0, broker2)
- TestUtils.assertLeader(client, topicPartition1, broker3)
-
- servers(broker2).shutdown()
- TestUtils.assertLeader(client, topicPartition0, broker3)
- servers(broker2).startup()
- TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
- TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
-
- val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
- val output = TestUtils.grabConsoleOutput(
- LeaderElectionCommand.main(
- Array(
- "--bootstrap-server", bootstrapServers(servers),
- "--election-type", "preferred",
- "--path-to-json-file", topicPartitionPath.toString
- )
+ cluster.shutdownBroker(broker2)
+ TestUtils.assertLeader(client, topicPartition0, broker3)
+ cluster.startBroker(broker2)
+ TestUtils.waitForBrokersInIsr(client, topicPartition0, Set(broker2))
+ TestUtils.waitForBrokersInIsr(client, topicPartition1, Set(broker2))
+
+ val topicPartitionPath = tempTopicPartitionFile(Set(topicPartition0, topicPartition1))
+ val output = TestUtils.grabConsoleOutput(
+ LeaderElectionCommand.main(
+ Array(
+ "--bootstrap-server", cluster.bootstrapServers(),
+ "--election-type", "preferred",
+ "--path-to-json-file", topicPartitionPath.toString
)
)
+ )
+
+ val electionResultOutputIter = output.split("\n").iterator
+
+ assertTrue(electionResultOutputIter.hasNext)
+ val firstLine = electionResultOutputIter.next()
+ assertTrue(firstLine.contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"),
+ s"Unexpected output: $firstLine")
- val electionResultOutputIter = output.split("\n").iterator
- assertTrue(electionResultOutputIter.hasNext)
- assertTrue(electionResultOutputIter.next().contains(s"Successfully completed leader election (PREFERRED) for partitions $topicPartition0"))
- assertTrue(electionResultOutputIter.hasNext)
- assertTrue(electionResultOutputIter.next().contains(s"Valid replica already elected for partitions $topicPartition1"))
- }
+ assertTrue(electionResultOutputIter.hasNext)
+ val secondLine = electionResultOutputIter.next()
+ assertTrue(secondLine.contains(s"Valid replica already elected for partitions $topicPartition1"),
+ s"Unexpected output: $secondLine")
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
index 768c06a..318e17e 100644
--- a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -19,43 +19,40 @@ package kafka.server
import java.net.InetAddress
import java.util
+import java.util.Collections.singletonList
import java.util.Properties
-import java.util.concurrent.CompletableFuture
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{CompletableFuture, ExecutionException}
+
import kafka.network.RequestChannel
import kafka.raft.RaftManager
import kafka.server.QuotaFactory.QuotaManagers
import kafka.test.MockController
-import kafka.utils.MockTime
+import kafka.utils.{MockTime, NotNothing}
import org.apache.kafka.clients.admin.AlterConfigOp
-import org.apache.kafka.common.Uuid
import org.apache.kafka.common.Uuid.ZERO_UUID
+import org.apache.kafka.common.acl.AclOperation
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.errors._
import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource, AlterConfigsResourceCollection => OldAlterConfigsResourceCollection, AlterableConfig => OldAlterableConfig, AlterableConfigCollection => OldAlterableConfigCollection}
+import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult
-import org.apache.kafka.common.message.CreateTopicsRequestData
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic
-import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
+import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicCollection}
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
import org.apache.kafka.common.message.DeleteTopicsRequestData.DeleteTopicState
import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult
-import org.apache.kafka.common.message._
import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.{AlterConfigsResource, AlterConfigsResourceCollection, AlterableConfig, AlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResourceCollection => OldAlterConfigsResourceCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterConfigsResource => OldAlterConfigsResource}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfigCollection => OldAlterableConfigCollection}
-import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig => OldAlterableConfig}
-import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
+import org.apache.kafka.common.message.{CreateTopicsRequestData, _}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
-import org.apache.kafka.common.protocol.ApiKeys
-import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.protocol.Errors._
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
import org.apache.kafka.common.requests._
+import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.common.{ElectionType, Uuid}
import org.apache.kafka.controller.Controller
import org.apache.kafka.server.authorizer.{Action, AuthorizableRequestContext, AuthorizationResult, Authorizer}
import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -65,7 +62,9 @@ import org.mockito.ArgumentMatchers._
import org.mockito.Mockito._
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
+import scala.reflect.ClassTag
class ControllerApisTest {
private val nodeId = 1
@@ -136,12 +135,11 @@ class ControllerApisTest {
def createDenyAllAuthorizer(): Authorizer = {
val authorizer = mock(classOf[Authorizer])
- mock(classOf[Authorizer])
when(authorizer.authorize(
any(classOf[AuthorizableRequestContext]),
any(classOf[java.util.List[Action]])
)).thenReturn(
- java.util.Collections.singletonList(AuthorizationResult.DENIED)
+ singletonList(AuthorizationResult.DENIED)
)
authorizer
}
@@ -732,6 +730,79 @@ class ControllerApisTest {
controllerApis.createPartitions(request, false, _ => Set("foo", "bar")).get().asScala.toSet)
}
+ @Test
+ def testElectLeadersAuthorization(): Unit = {
+ val authorizer = mock(classOf[Authorizer])
+ val controller = mock(classOf[Controller])
+ val controllerApis = createControllerApis(Some(authorizer), controller)
+
+ val request = new ElectLeadersRequest.Builder(
+ ElectionType.PREFERRED,
+ null,
+ 30000
+ ).build()
+
+ val resource = new ResourcePattern(ResourceType.CLUSTER, Resource.CLUSTER_NAME, PatternType.LITERAL)
+ val actions = singletonList(new Action(AclOperation.ALTER, resource, 1, true, true))
+
+ when(authorizer.authorize(
+ any[RequestContext],
+ ArgumentMatchers.eq(actions)
+ )).thenReturn(singletonList(AuthorizationResult.DENIED))
+
+ val response = handleRequest[ElectLeadersResponse](request, controllerApis)
+ assertEquals(Errors.CLUSTER_AUTHORIZATION_FAILED, Errors.forCode(response.data.errorCode))
+ }
+
+ @Test
+ def testElectLeadersHandledByController(): Unit = {
+ val controller = mock(classOf[Controller])
+ val controllerApis = createControllerApis(None, controller)
+
+ val request = new ElectLeadersRequest.Builder(
+ ElectionType.PREFERRED,
+ null,
+ 30000
+ ).build()
+
+ val responseData = new ElectLeadersResponseData()
+ .setErrorCode(Errors.NOT_CONTROLLER.code)
+
+ when(controller.electLeaders(
+ request.data
+ )).thenReturn(CompletableFuture.completedFuture(responseData))
+
+ val response = handleRequest[ElectLeadersResponse](request, controllerApis)
+ assertEquals(Errors.NOT_CONTROLLER, Errors.forCode(response.data.errorCode))
+ }
+
+ private def handleRequest[T <: AbstractResponse](
+ request: AbstractRequest,
+ controllerApis: ControllerApis
+ )(
+ implicit classTag: ClassTag[T],
+ @nowarn("cat=unused") nn: NotNothing[T]
+ ): T = {
+ val req = buildRequest(request)
+
+ controllerApis.handle(req, RequestLocal.NoCaching)
+
+ val capturedResponse: ArgumentCaptor[AbstractResponse] =
+ ArgumentCaptor.forClass(classOf[AbstractResponse])
+ verify(requestChannel).sendResponse(
+ ArgumentMatchers.eq(req),
+ capturedResponse.capture(),
+ ArgumentMatchers.eq(None)
+ )
+
+ capturedResponse.getValue match {
+ case response: T => response
+ case response =>
+ throw new ClassCastException(s"Expected response with type ${classTag.runtimeClass}, " +
+ s"but found ${response.getClass}")
+ }
+ }
+
@AfterEach
def tearDown(): Unit = {
quotas.shutdown()
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 1ddd63a..8528511 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -23,6 +23,7 @@ import java.util
import java.util.Arrays.asList
import java.util.concurrent.TimeUnit
import java.util.{Collections, Optional, Properties, Random}
+
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
import kafka.controller.{ControllerContext, KafkaController}
@@ -68,7 +69,7 @@ import org.apache.kafka.common.requests.{FetchMetadata => JFetchMetadata, _}
import org.apache.kafka.common.resource.{PatternType, Resource, ResourcePattern, ResourceType}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, KafkaPrincipalSerde, SecurityProtocol}
import org.apache.kafka.common.utils.{ProducerIdAndEpoch, SecurityUtils, Utils}
-import org.apache.kafka.common.{IsolationLevel, Node, TopicPartition, Uuid}
+import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicPartition, Uuid}
import org.apache.kafka.server.authorizer.{Action, AuthorizationResult, Authorizer}
import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer}
@@ -469,6 +470,12 @@ class KafkaApisTest {
}
@Test
+ def testElectLeadersForwarding(): Unit = {
+ val requestBuilder = new ElectLeadersRequest.Builder(ElectionType.PREFERRED, null, 30000)
+ testKraftForwarding(ApiKeys.ELECT_LEADERS, requestBuilder)
+ }
+
+ @Test
def testDescribeQuorumNotAllowedForZkClusters(): Unit = {
val requestData = DescribeQuorumRequest.singletonRequest(KafkaRaftServer.MetadataPartition)
val requestBuilder = new DescribeQuorumRequest.Builder(requestData)
@@ -495,6 +502,18 @@ class KafkaApisTest {
)
}
+ private def testKraftForwarding(
+ apiKey: ApiKeys,
+ requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]
+ ): Unit = {
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ testForwardableApi(
+ createKafkaApis(enableForwarding = true, raftSupport = true),
+ apiKey,
+ requestBuilder
+ )
+ }
+
private def testForwardableApi(apiKey: ApiKeys, requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): Unit = {
testForwardableApi(
createKafkaApis(enableForwarding = true),
@@ -511,7 +530,8 @@ class KafkaApisTest {
val topicHeader = new RequestHeader(apiKey, apiKey.latestVersion,
clientId, 0)
- val request = buildRequest(requestBuilder.build(topicHeader.apiVersion))
+ val apiRequest = requestBuilder.build(topicHeader.apiVersion)
+ val request = buildRequest(apiRequest)
if (kafkaApis.metadataSupport.isInstanceOf[ZkSupport]) {
// The controller check only makes sense for ZK clusters. For KRaft,
@@ -520,18 +540,28 @@ class KafkaApisTest {
EasyMock.expect(controller.isActive).andReturn(false)
}
- expectNoThrottling(request)
+ val capturedResponse = expectNoThrottling(request)
+ val forwardCallback: Capture[Option[AbstractResponse] => Unit] = EasyMock.newCapture()
EasyMock.expect(forwardingManager.forwardRequest(
EasyMock.eq(request),
- anyObject[Option[AbstractResponse] => Unit]()
+ EasyMock.capture(forwardCallback)
)).once()
EasyMock.replay(replicaManager, clientRequestQuotaManager, requestChannel, controller, forwardingManager)
kafkaApis.handle(request, RequestLocal.withThreadConfinedCaching)
+ assertNotNull(request.buffer, "The buffer was unexpectedly deallocated after " +
+ s"`handle` returned (is $apiKey marked as forwardable in `ApiKeys`?)")
+
+ val expectedResponse = apiRequest.getErrorResponse(Errors.NOT_CONTROLLER.exception)
+ assertTrue(forwardCallback.hasCaptured)
+ forwardCallback.getValue.apply(Some(expectedResponse))
- EasyMock.verify(controller, forwardingManager)
+ assertTrue(capturedResponse.hasCaptured)
+ assertEquals(expectedResponse, capturedResponse.getValue)
+
+ EasyMock.verify(controller, requestChannel, forwardingManager)
}
private def authorizeResource(authorizer: Authorizer,
@@ -3980,13 +4010,13 @@ class KafkaApisTest {
request
}
- private def verifyShouldNeverHandle(handler: RequestChannel.Request => Unit): Unit = {
+ private def verifyShouldNeverHandleErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldNeverReceive(request).getMessage, e.getMessage)
}
- private def verifyShouldAlwaysForward(handler: RequestChannel.Request => Unit): Unit = {
+ private def verifyShouldAlwaysForwardErrorMessage(handler: RequestChannel.Request => Unit): Unit = {
val request = createMockRequest()
val e = assertThrows(classOf[UnsupportedVersionException], () => handler(request))
assertEquals(KafkaApis.shouldAlwaysForward(request).getMessage, e.getMessage)
@@ -3995,126 +4025,132 @@ class KafkaApisTest {
@Test
def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
}
@Test
def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
}
@Test
def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
}
@Test
def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
}
@Test
def testRaftShouldNeverHandleEnvelope(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
+ verifyShouldNeverHandleErrorMessage(createKafkaApis(raftSupport = true).handleEnvelope(_, RequestLocal.withThreadConfinedCaching))
}
@Test
def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
}
@Test
def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateAcls)
}
@Test
def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleDeleteAcls)
}
@Test
def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
}
@Test
def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
}
@Test
def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
}
@Test
def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
}
@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleUpdateFeatures)
+ }
+
+ @Test
+ def testRaftShouldAlwaysForwardElectLeaders(): Unit = {
+ metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleElectLeaders)
}
@Test
def testRaftShouldAlwaysForwardListPartitionReassignments(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
- verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
+ verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = true).handleListPartitionReassignmentsRequest)
}
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 64003b7..06c7569 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -17,6 +17,7 @@
package kafka.utils
import java.io._
+import java.net.InetAddress
import java.nio._
import java.nio.channels._
import java.nio.charset.{Charset, StandardCharsets}
@@ -26,12 +27,12 @@ import java.time.Duration
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
import java.util.concurrent.{Callable, ExecutionException, Executors, TimeUnit}
import java.util.{Arrays, Collections, Optional, Properties}
-import com.yammer.metrics.core.Meter
+import com.yammer.metrics.core.Meter
import javax.net.ssl.X509TrustManager
import kafka.api._
import kafka.cluster.{Broker, EndPoint, IsrChangeListener}
-import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.controller.{ControllerEventManager, LeaderIsrAndControllerEpoch}
import kafka.log._
import kafka.metrics.KafkaYammerMetrics
import kafka.network.RequestChannel
@@ -44,6 +45,7 @@ import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.admin.AlterConfigOp.OpType
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter}
import org.apache.kafka.common.config.ConfigResource
@@ -53,9 +55,9 @@ import org.apache.kafka.common.header.Header
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName, Mode}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.{AbstractRequest, EnvelopeRequest, RequestContext, RequestHeader}
@@ -73,14 +75,13 @@ import org.apache.zookeeper.data.ACL
import org.junit.jupiter.api.Assertions._
import org.mockito.Mockito
-import java.net.InetAddress
-
import scala.annotation.nowarn
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.{Map, Seq, mutable}
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
+import scala.util.{Failure, Success, Try}
/**
* Utility functions to help with testing
@@ -1625,19 +1626,37 @@ object TestUtils extends Logging {
waitForLeaderToBecome(client, topicPartition, None)
}
- def waitForLeaderToBecome(client: Admin, topicPartition: TopicPartition, leader: Option[Int]): Unit = {
+ def waitForOnlineBroker(client: Admin, brokerId: Int): Unit = {
+ waitUntilTrue(() => {
+ val nodes = client.describeCluster().nodes().get()
+ nodes.asScala.exists(_.id == brokerId)
+ }, s"Timed out waiting for brokerId $brokerId to come online")
+ }
+
+ def waitForLeaderToBecome(
+ client: Admin,
+ topicPartition: TopicPartition,
+ expectedLeaderOpt: Option[Int]
+ ): Unit = {
val topic = topicPartition.topic
- val partition = topicPartition.partition
+ val partitionId = topicPartition.partition
+
+ def currentLeader: Try[Option[Int]] = Try {
+ val topicDescription = client.describeTopics(List(topic).asJava).allTopicNames.get.get(topic)
+ topicDescription.partitions.asScala
+ .find(_.partition == partitionId)
+ .flatMap(partitionState => Option(partitionState.leader))
+ .map(_.id)
+ }
- waitUntilTrue(() => {
- try {
- val topicResult = client.describeTopics(Arrays.asList(topic)).allTopicNames.get.get(topic)
- val partitionResult = topicResult.partitions.get(partition)
- Option(partitionResult.leader).map(_.id) == leader
- } catch {
- case e: ExecutionException if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
- }
- }, "Timed out waiting for leader metadata")
+ val (lastLeaderCheck, isLeaderElected) = computeUntilTrue(currentLeader) {
+ case Success(leaderOpt) => leaderOpt == expectedLeaderOpt
+ case Failure(e: ExecutionException) if e.getCause.isInstanceOf[UnknownTopicOrPartitionException] => false
+ case Failure(e) => throw e
+ }
+
+ assertTrue(isLeaderElected, s"Timed out waiting for leader to become $expectedLeaderOpt. " +
+ s"Last metadata lookup returned leader = ${lastLeaderCheck.getOrElse("unknown")}")
}
def waitForBrokersOutOfIsr(client: Admin, partition: Set[TopicPartition], brokerIds: Set[Int]): Unit = {
@@ -1652,7 +1671,7 @@ object TestUtils extends Logging {
brokerIds.intersect(isr).isEmpty
},
- s"Expected brokers $brokerIds to no longer in the ISR for $partition"
+ s"Expected brokers $brokerIds to no longer be in the ISR for $partition"
)
}
@@ -1917,4 +1936,28 @@ object TestUtils extends Logging {
)
}
+ def verifyNoUnexpectedThreads(context: String): Unit = {
+ // Threads which may cause transient failures in subsequent tests if not shutdown.
+ // These include threads which make connections to brokers and may cause issues
+ // when broker ports are reused (e.g. auto-create topics) as well as threads
+ // which reset static JAAS configuration.
+ val unexpectedThreadNames = Set(
+ ControllerEventManager.ControllerEventThreadName,
+ KafkaProducer.NETWORK_THREAD_PREFIX,
+ AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
+ AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
+ ZooKeeperTestHarness.ZkClientEventThreadSuffix
+ )
+
+ def unexpectedThreads: Set[String] = {
+ val allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
+ allThreads.filter(t => unexpectedThreadNames.exists(s => t.contains(s))).toSet
+ }
+
+ val (unexpected, _) = TestUtils.computeUntilTrue(unexpectedThreads)(_.isEmpty)
+ assertTrue(unexpected.isEmpty,
+ s"Found ${unexpected.size} unexpected threads during $context: " +
+ s"${unexpected.mkString("`", ",", "`")}")
+ }
+
}
diff --git a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
index 8b61c0e..a81cbb9 100755
--- a/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
+++ b/core/src/test/scala/unit/kafka/zk/ZooKeeperTestHarness.scala
@@ -19,19 +19,12 @@ package kafka.zk
import javax.security.auth.login.Configuration
import kafka.utils.{CoreUtils, Logging, TestUtils}
-import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
-import org.junit.jupiter.api.Assertions._
import org.apache.kafka.common.security.JaasUtils
-
-import scala.collection.Set
-import scala.jdk.CollectionConverters._
-import org.apache.kafka.clients.producer.KafkaProducer
-import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
-import kafka.controller.ControllerEventManager
-import org.apache.kafka.clients.admin.AdminClientUnitTestEnv
import org.apache.kafka.common.utils.Time
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterAll, AfterEach, BeforeAll, BeforeEach, Tag}
@Tag("integration")
abstract class ZooKeeperTestHarness extends Logging {
@@ -87,16 +80,6 @@ abstract class ZooKeeperTestHarness extends Logging {
object ZooKeeperTestHarness {
val ZkClientEventThreadSuffix = "-EventThread"
- // Threads which may cause transient failures in subsequent tests if not shutdown.
- // These include threads which make connections to brokers and may cause issues
- // when broker ports are reused (e.g. auto-create topics) as well as threads
- // which reset static JAAS configuration.
- val unexpectedThreadNames = Set(ControllerEventManager.ControllerEventThreadName,
- KafkaProducer.NETWORK_THREAD_PREFIX,
- AdminClientUnitTestEnv.kafkaAdminClientNetworkThreadPrefix(),
- AbstractCoordinator.HEARTBEAT_THREAD_PREFIX,
- ZkClientEventThreadSuffix)
-
/**
* Verify that a previous test that doesn't use ZooKeeperTestHarness hasn't left behind an unexpected thread.
* This assumes that brokers, ZooKeeper clients, producers and consumers are not created in another @BeforeClass,
@@ -104,7 +87,7 @@ object ZooKeeperTestHarness {
*/
@BeforeAll
def setUpClass(): Unit = {
- verifyNoUnexpectedThreads("@BeforeClass")
+ TestUtils.verifyNoUnexpectedThreads("@BeforeAll")
}
/**
@@ -112,19 +95,7 @@ object ZooKeeperTestHarness {
*/
@AfterAll
def tearDownClass(): Unit = {
- verifyNoUnexpectedThreads("@AfterClass")
+ TestUtils.verifyNoUnexpectedThreads("@AfterAll")
}
- /**
- * Verifies that threads which are known to cause transient failures in subsequent tests
- * have been shutdown.
- */
- def verifyNoUnexpectedThreads(context: String): Unit = {
- def allThreads = Thread.getAllStackTraces.keySet.asScala.map(thread => thread.getName)
- val (threads, noUnexpected) = TestUtils.computeUntilTrue(allThreads) { threads =>
- threads.forall(t => unexpectedThreadNames.forall(s => !t.contains(s)))
- }
- assertTrue(noUnexpected, s"Found unexpected threads during $context, allThreads=$threads, " +
- s"unexpected=${threads.filterNot(t => unexpectedThreadNames.forall(s => !t.contains(s)))}")
- }
}
diff --git a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
index a0eb1ea..5cfb9ee 100644
--- a/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zookeeper/ZooKeeperClientTest.scala
@@ -25,6 +25,7 @@ import scala.collection.Seq
import com.yammer.metrics.core.{Gauge, Meter, MetricName}
import kafka.server.KafkaConfig
import kafka.metrics.KafkaYammerMetrics
+import kafka.utils.TestUtils
import kafka.zk.ZooKeeperTestHarness
import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.utils.Time
@@ -33,7 +34,7 @@ import org.apache.zookeeper.Watcher.Event.{EventType, KeeperState}
import org.apache.zookeeper.ZooKeeper.States
import org.apache.zookeeper.client.ZKClientConfig
import org.apache.zookeeper.{CreateMode, WatchedEvent, ZooDefs}
-import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertTrue, assertThrows, fail}
+import org.junit.jupiter.api.Assertions.{assertArrayEquals, assertEquals, assertFalse, assertThrows, assertTrue, fail}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import scala.jdk.CollectionConverters._
@@ -46,7 +47,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
@BeforeEach
override def setUp(): Unit = {
- ZooKeeperTestHarness.verifyNoUnexpectedThreads("@BeforeEach")
+ TestUtils.verifyNoUnexpectedThreads("@BeforeEach")
cleanMetricsRegistry()
super.setUp()
zooKeeperClient = newZooKeeperClient()
@@ -58,7 +59,7 @@ class ZooKeeperClientTest extends ZooKeeperTestHarness {
zooKeeperClient.close()
super.tearDown()
System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
- ZooKeeperTestHarness.verifyNoUnexpectedThreads("@AfterEach")
+ TestUtils.verifyNoUnexpectedThreads("@AfterEach")
}
@Test
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index c04a8fb..5a63094 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -17,8 +17,6 @@
package org.apache.kafka.controller;
-import java.util.Set;
-import java.util.stream.Collectors;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
@@ -35,11 +33,11 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.BrokerRegistrationReply;
import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.VersionRange;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
@@ -52,7 +50,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
index 5092ba1..cf0f6bf 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java
@@ -22,6 +22,8 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
@@ -38,6 +40,8 @@ import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
* PartitionChangeBuilder handles changing partition registrations.
*/
public class PartitionChangeBuilder {
+ private static final Logger log = LoggerFactory.getLogger(PartitionChangeBuilder.class);
+
public static boolean changeRecordIsNoOp(PartitionChangeRecord record) {
if (record.isr() != null) return false;
if (record.leader() != NO_LEADER_CHANGE) return false;
@@ -141,12 +145,15 @@ public class PartitionChangeBuilder {
private void tryElection(PartitionChangeRecord record) {
BestLeader bestLeader = new BestLeader();
if (bestLeader.node != partition.leader) {
+ log.debug("Setting new leader for topicId {}, partition {} to {}", topicId, partitionId, bestLeader.node);
record.setLeader(bestLeader.node);
if (bestLeader.unclean) {
// If the election was unclean, we have to forcibly set the ISR to just the
// new leader. This can result in data loss!
record.setIsr(Collections.singletonList(bestLeader.node));
}
+ } else {
+ log.debug("Failed to find a new leader with current state: {}", this);
}
}
@@ -240,4 +247,20 @@ public class PartitionChangeBuilder {
PARTITION_CHANGE_RECORD.highestSupportedVersion()));
}
}
+
+ @Override
+ public String toString() {
+ return "PartitionChangeBuilder(" +
+ "partition=" + partition +
+ ", topicId=" + topicId +
+ ", partitionId=" + partitionId +
+ ", isAcceptableLeader=" + isAcceptableLeader +
+ ", uncleanElectionOk=" + uncleanElectionOk +
+ ", targetIsr=" + targetIsr +
+ ", targetReplicas=" + targetReplicas +
+ ", targetRemoving=" + targetRemoving +
+ ", targetAdding=" + targetAdding +
+ ", alwaysElectPreferredIfPossible=" + alwaysElectPreferredIfPossible +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3066f4b..4450e25 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -132,6 +132,14 @@ public class ReplicationControlManager {
this.id = id;
this.parts = new TimelineHashMap<>(snapshotRegistry, 0);
}
+
+ public String name() {
+ return name;
+ }
+
+ public Uuid topicId() {
+ return id;
+ }
}
private final SnapshotRegistry snapshotRegistry;
@@ -587,6 +595,11 @@ public class ReplicationControlManager {
}
// VisibleForTesting
+ TopicControlInfo getTopic(Uuid topicId) {
+ return topics.get(topicId);
+ }
+
+ // VisibleForTesting
BrokersToIsrs brokersToIsrs() {
return brokersToIsrs;
}
@@ -787,7 +800,7 @@ public class ReplicationControlManager {
}
ControllerResult<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request) {
- boolean uncleanOk = electionTypeIsUnclean(request.electionType());
+ ElectionType electionType = electionType(request.electionType());
List<ApiMessageAndVersion> records = new ArrayList<>();
ElectLeadersResponseData response = new ElectLeadersResponseData();
if (request.topicPartitions() == null) {
@@ -804,11 +817,16 @@ public class ReplicationControlManager {
TopicControlInfo topic = topics.get(topicEntry.getValue());
if (topic != null) {
for (int partitionId : topic.parts.keySet()) {
- ApiError error = electLeader(topicName, partitionId, uncleanOk, records);
- topicResults.partitionResult().add(new PartitionResult().
- setPartitionId(partitionId).
- setErrorCode(error.error().code()).
- setErrorMessage(error.message()));
+ ApiError error = electLeader(topicName, partitionId, electionType, records);
+
+ // When electing leaders for all partitions, we do not return
+ // partitions which already have the desired leader.
+ if (error.error() != Errors.ELECTION_NOT_NEEDED) {
+ topicResults.partitionResult().add(new PartitionResult().
+ setPartitionId(partitionId).
+ setErrorCode(error.error().code()).
+ setErrorMessage(error.message()));
+ }
}
}
}
@@ -818,7 +836,7 @@ public class ReplicationControlManager {
new ReplicaElectionResult().setTopic(topic.topic());
response.replicaElectionResults().add(topicResults);
for (int partitionId : topic.partitions()) {
- ApiError error = electLeader(topic.topic(), partitionId, uncleanOk, records);
+ ApiError error = electLeader(topic.topic(), partitionId, electionType, records);
topicResults.partitionResult().add(new PartitionResult().
setPartitionId(partitionId).
setErrorCode(error.error().code()).
@@ -829,17 +847,15 @@ public class ReplicationControlManager {
return ControllerResult.of(records, response);
}
- static boolean electionTypeIsUnclean(byte electionType) {
- ElectionType type;
+ private static ElectionType electionType(byte electionType) {
try {
- type = ElectionType.valueOf(electionType);
+ return ElectionType.valueOf(electionType);
} catch (IllegalArgumentException e) {
throw new InvalidRequestException("Unknown election type " + (int) electionType);
}
- return type == ElectionType.UNCLEAN;
}
- ApiError electLeader(String topic, int partitionId, boolean uncleanOk,
+ ApiError electLeader(String topic, int partitionId, ElectionType electionType,
List<ApiMessageAndVersion> records) {
Uuid topicId = topicsByName.get(topic);
if (topicId == null) {
@@ -856,21 +872,24 @@ public class ReplicationControlManager {
return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
"No such partition as " + topic + "-" + partitionId);
}
+ if ((electionType == ElectionType.PREFERRED && partition.hasPreferredLeader())
+ || (electionType == ElectionType.UNCLEAN && partition.hasLeader())) {
+ return new ApiError(Errors.ELECTION_NOT_NEEDED);
+ }
+
PartitionChangeBuilder builder = new PartitionChangeBuilder(partition,
topicId,
partitionId,
r -> clusterControl.unfenced(r),
- () -> uncleanOk || configurationControl.uncleanLeaderElectionEnabledForTopic(topic));
- builder.setAlwaysElectPreferredIfPossible(true);
+ () -> electionType == ElectionType.UNCLEAN);
+
+ builder.setAlwaysElectPreferredIfPossible(electionType == ElectionType.PREFERRED);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
- if (partition.leader == NO_LEADER) {
- // If we can't find any leader for the partition, return an error.
- return new ApiError(Errors.LEADER_NOT_AVAILABLE,
- "Unable to find any leader for the partition.");
+ if (electionType == ElectionType.PREFERRED) {
+ return new ApiError(Errors.PREFERRED_LEADER_NOT_AVAILABLE);
} else {
- // There is nothing to do.
- return ApiError.NONE;
+ return new ApiError(Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE);
}
}
records.add(record.get());
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
index e159732..4b574b3 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
@@ -81,4 +81,11 @@ public final class ClientQuotasDelta {
}
return new ClientQuotasImage(newEntities);
}
+
+ @Override
+ public String toString() {
+ return "ClientQuotasDelta(" +
+ "changes=" + changes +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
index 90f61d7..6c48b8e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
@@ -126,4 +126,11 @@ public final class ClusterDelta {
}
return new ClusterImage(newBrokers);
}
+
+ @Override
+ public String toString() {
+ return "ClusterDelta(" +
+ "changedBrokers=" + changedBrokers +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
index 4b3aa0b..677f764 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationDelta.java
@@ -74,4 +74,13 @@ public final class ConfigurationDelta {
}
return new ConfigurationImage(newData);
}
+
+ @Override
+ public String toString() {
+ // Values are intentionally left out of this so that sensitive configs
+ // do not end up in logging by mistake.
+ return "ConfigurationDelta(" +
+ "changedKeys=" + changes.keySet() +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
index df3b0ff..d0f5848 100644
--- a/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
@@ -96,4 +96,11 @@ public final class ConfigurationsDelta {
}
return new ConfigurationsImage(newData);
}
+
+ @Override
+ public String toString() {
+ return "ConfigurationsDelta(" +
+ "changes=" + changes +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
index 940697e..781c496 100644
--- a/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/FeaturesDelta.java
@@ -83,4 +83,11 @@ public final class FeaturesDelta {
}
return new FeaturesImage(newFinalizedVersions);
}
+
+ @Override
+ public String toString() {
+ return "FeaturesDelta(" +
+ "changes=" + changes +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index 6be0dd6..4b9451b 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -256,4 +256,15 @@ public final class MetadataDelta {
return new MetadataImage(newFeatures, newCluster, newTopics, newConfigs,
newClientQuotas);
}
+
+ @Override
+ public String toString() {
+ return "MetadataDelta(" +
+ "featuresDelta=" + featuresDelta +
+ ", clusterDelta=" + clusterDelta +
+ ", topicsDelta=" + topicsDelta +
+ ", configsDelta=" + configsDelta +
+ ", clientQuotasDelta=" + clientQuotasDelta +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
index 88fef30..3214a0c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicDelta.java
@@ -138,4 +138,11 @@ public final class TopicDelta {
return new LocalReplicaChanges(deletes, leaders, followers);
}
+
+ @Override
+ public String toString() {
+ return "TopicDelta(" +
+ "partitionChanges=" + partitionChanges +
+ ')';
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
index a146bba..f9d8087 100644
--- a/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/TopicsDelta.java
@@ -201,4 +201,12 @@ public final class TopicsDelta {
return new LocalReplicaChanges(deletes, leaders, followers);
}
+
+ @Override
+ public String toString() {
+ return "TopicsDelta(" +
+ "changedTopics=" + changedTopics +
+ ", deletedTopicIds=" + deletedTopicIds +
+ ')';
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
index 1b013f4..f935a80 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/PartitionChangeBuilderTest.java
@@ -107,6 +107,14 @@ public class PartitionChangeBuilderTest {
shouldTryElection());
assertFalse(createFooBuilder(false).setTargetIsr(Arrays.asList(2, 1)).
shouldTryElection());
+
+ assertTrue(createFooBuilder(true)
+ .setTargetIsr(Arrays.asList(3))
+ .shouldTryElection());
+ assertTrue(createFooBuilder(true)
+ .setTargetIsr(Arrays.asList(4))
+ .setTargetReplicas(Arrays.asList(2, 1, 3, 4))
+ .shouldTryElection());
}
private static void testTriggerLeaderEpochBumpIfNeededLeader(PartitionChangeBuilder builder,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 09449d3..78b0995 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -18,6 +18,7 @@
package org.apache.kafka.controller;
import java.util.Optional;
+import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigResource;
@@ -37,12 +38,12 @@ import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsAssignment;
import org.apache.kafka.common.message.CreatePartitionsRequestData.CreatePartitionsTopic;
import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartitionsTopicResult;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableReplicaAssignment;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
-import org.apache.kafka.common.message.CreateTopicsRequestData;
-import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.ElectLeadersRequestData;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitions;
import org.apache.kafka.common.message.ElectLeadersRequestData.TopicPartitionsCollection;
@@ -50,9 +51,10 @@ import org.apache.kafka.common.message.ElectLeadersResponseData;
import org.apache.kafka.common.message.ElectLeadersResponseData.PartitionResult;
import org.apache.kafka.common.message.ElectLeadersResponseData.ReplicaElectionResult;
import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData.ListPartitionReassignmentsTopics;
+import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingPartitionReassignment;
import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.OngoingTopicReassignment;
-import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData;
+import org.apache.kafka.common.metadata.ConfigRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -63,20 +65,21 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metadata.BrokerRegistration;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -84,18 +87,26 @@ import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.protocol.Errors.ELECTION_NOT_NEEDED;
+import static org.apache.kafka.common.protocol.Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.FENCED_LEADER_EPOCH;
import static org.apache.kafka.common.protocol.Errors.INVALID_PARTITIONS;
import static org.apache.kafka.common.protocol.Errors.INVALID_REPLICA_ASSIGNMENT;
import static org.apache.kafka.common.protocol.Errors.INVALID_TOPIC_EXCEPTION;
import static org.apache.kafka.common.protocol.Errors.NONE;
import static org.apache.kafka.common.protocol.Errors.NO_REASSIGNMENT_IN_PROGRESS;
+import static org.apache.kafka.common.protocol.Errors.PREFERRED_LEADER_NOT_AVAILABLE;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_ID;
import static org.apache.kafka.common.protocol.Errors.UNKNOWN_TOPIC_OR_PARTITION;
import static org.apache.kafka.controller.BrokersToIsrs.TopicIdPartition;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -117,7 +128,7 @@ public class ReplicationControlManagerTest {
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
final ClusterControlManager clusterControl = new ClusterControlManager(
- logContext, time, snapshotRegistry, BROKER_SESSION_TIMEOUT_MS,
+ logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
new StripedReplicaPlacer(random));
final ControllerMetrics metrics = new MockControllerMetrics();
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
@@ -200,7 +211,45 @@ public class ReplicationControlManagerTest {
}
}
- void unfenceBrokers(Integer... brokerIds) throws Exception {
+ void alterIsr(
+ TopicIdPartition topicIdPartition,
+ int leaderId,
+ List<Integer> isr
+ ) throws Exception {
+ BrokerRegistration registration = clusterControl.brokerRegistrations().get(leaderId);
+ assertFalse(registration.fenced());
+
+ PartitionRegistration partition = replicationControl.getPartition(
+ topicIdPartition.topicId(),
+ topicIdPartition.partitionId()
+ );
+ assertNotNull(partition);
+ assertEquals(leaderId, partition.leader);
+
+ PartitionData partitionData = new PartitionData()
+ .setPartitionIndex(topicIdPartition.partitionId())
+ .setCurrentIsrVersion(partition.partitionEpoch)
+ .setLeaderEpoch(partition.leaderEpoch)
+ .setNewIsr(isr);
+
+ String topicName = replicationControl.getTopic(topicIdPartition.topicId()).name();
+ TopicData topicData = new TopicData()
+ .setName(topicName)
+ .setPartitions(singletonList(partitionData));
+
+ ControllerResult<AlterIsrResponseData> alterIsr = replicationControl.alterIsr(
+ new AlterIsrRequestData()
+ .setBrokerId(leaderId)
+ .setBrokerEpoch(registration.epoch())
+ .setTopics(singletonList(topicData)));
+ replay(alterIsr.records());
+ }
+
+ void unfenceBrokers(Integer... brokerIds) throws Exception {
+ unfenceBrokers(Utils.mkSet(brokerIds));
+ }
+
+ void unfenceBrokers(Set<Integer> brokerIds) throws Exception {
for (int brokerId : brokerIds) {
ControllerResult<BrokerHeartbeatReply> result = replicationControl.
processBrokerHeartbeat(new BrokerHeartbeatRequestData().
@@ -213,6 +262,19 @@ public class ReplicationControlManagerTest {
}
}
+ void alterTopicConfig(
+ String topic,
+ String configKey,
+ String configValue
+ ) throws Exception {
+ ConfigRecord configRecord = new ConfigRecord()
+ .setResourceType(ConfigResource.Type.TOPIC.id())
+ .setResourceName(topic)
+ .setName(configKey)
+ .setValue(configValue);
+ replay(singletonList(new ApiMessageAndVersion(configRecord, (short) 0)));
+ }
+
void fenceBrokers(Set<Integer> brokerIds) throws Exception {
time.sleep(BROKER_SESSION_TIMEOUT_MS);
@@ -284,10 +346,10 @@ public class ReplicationControlManagerTest {
setErrorMessage(Errors.TOPIC_ALREADY_EXISTS.exception().getMessage()));
assertEquals(expectedResponse3, result3.response());
Uuid fooId = result2.response().topics().find("foo").topicId();
- RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
- Arrays.asList(new ApiMessageAndVersion(new PartitionRecord().
+ RecordTestUtils.assertBatchIteratorContains(asList(
+ asList(new ApiMessageAndVersion(new PartitionRecord().
setPartitionId(0).setTopicId(fooId).
- setReplicas(Arrays.asList(1, 2, 0)).setIsr(Arrays.asList(1, 2, 0)).
+ setReplicas(asList(1, 2, 0)).setIsr(asList(1, 2, 0)).
setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
new ApiMessageAndVersion(new TopicRecord().
@@ -450,7 +512,7 @@ public class ReplicationControlManagerTest {
assertEquals(OptionalInt.of(0), ctx.currentLeader(topicIdPartition));
long brokerEpoch = ctx.currentBrokerEpoch(0);
PartitionData shrinkIsrRequest = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
+ replicationControl, topicIdPartition, asList(0, 1));
ControllerResult<AlterIsrResponseData> shrinkIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", shrinkIsrRequest);
AlterIsrResponseData.PartitionData shrinkIsrResponse = assertAlterIsrResponse(
@@ -458,7 +520,7 @@ public class ReplicationControlManagerTest {
assertConsistentAlterIsrResponse(replicationControl, topicIdPartition, shrinkIsrResponse);
PartitionData expandIsrRequest = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1, 2));
+ replicationControl, topicIdPartition, asList(0, 1, 2));
ControllerResult<AlterIsrResponseData> expandIsrResult = sendAlterIsr(
replicationControl, 0, brokerEpoch, "foo", expandIsrRequest);
AlterIsrResponseData.PartitionData expandIsrResponse = assertAlterIsrResponse(
@@ -482,7 +544,7 @@ public class ReplicationControlManagerTest {
// Invalid leader
PartitionData invalidLeaderRequest = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
+ replicationControl, topicIdPartition, asList(0, 1));
ControllerResult<AlterIsrResponseData> invalidLeaderResult = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidLeaderRequest);
@@ -490,13 +552,13 @@ public class ReplicationControlManagerTest {
// Stale broker epoch
PartitionData invalidBrokerEpochRequest = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
+ replicationControl, topicIdPartition, asList(0, 1));
assertThrows(StaleBrokerEpochException.class, () -> sendAlterIsr(
replicationControl, 0, brokerEpoch - 1, "foo", invalidBrokerEpochRequest));
// Invalid leader epoch
PartitionData invalidLeaderEpochRequest = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
+ replicationControl, topicIdPartition, asList(0, 1));
invalidLeaderEpochRequest.setLeaderEpoch(500);
ControllerResult<AlterIsrResponseData> invalidLeaderEpochResult = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
@@ -505,8 +567,8 @@ public class ReplicationControlManagerTest {
// Invalid ISR (3 is not a valid replica)
PartitionData invalidIsrRequest1 = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
- invalidIsrRequest1.setNewIsr(Arrays.asList(0, 1, 3));
+ replicationControl, topicIdPartition, asList(0, 1));
+ invalidIsrRequest1.setNewIsr(asList(0, 1, 3));
ControllerResult<AlterIsrResponseData> invalidIsrResult1 = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest1);
@@ -514,8 +576,8 @@ public class ReplicationControlManagerTest {
// Invalid ISR (does not include leader 0)
PartitionData invalidIsrRequest2 = newAlterIsrPartition(
- replicationControl, topicIdPartition, Arrays.asList(0, 1));
- invalidIsrRequest2.setNewIsr(Arrays.asList(1, 2));
+ replicationControl, topicIdPartition, asList(0, 1));
+ invalidIsrRequest2.setNewIsr(asList(1, 2));
ControllerResult<AlterIsrResponseData> invalidIsrResult2 = sendAlterIsr(
replicationControl, 1, ctx.currentBrokerEpoch(1),
"foo", invalidIsrRequest2);
@@ -647,28 +709,28 @@ public class ReplicationControlManagerTest {
assertNull(replicationControl.getPartition(topicId, 3));
assertCreatedTopicConfigs(ctx, "foo", requestConfigs);
- assertEquals(Collections.singletonMap(topicId, new ResultOrError<>("foo")),
+ assertEquals(singletonMap(topicId, new ResultOrError<>("foo")),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(topicId)));
- assertEquals(Collections.singletonMap("foo", new ResultOrError<>(topicId)),
+ assertEquals(singletonMap("foo", new ResultOrError<>(topicId)),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("foo")));
Uuid invalidId = new Uuid(topicId.getMostSignificantBits() + 1,
topicId.getLeastSignificantBits());
- assertEquals(Collections.singletonMap(invalidId,
+ assertEquals(singletonMap(invalidId,
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_ID))),
replicationControl.findTopicNames(Long.MAX_VALUE, Collections.singleton(invalidId)));
- assertEquals(Collections.singletonMap("bar",
+ assertEquals(singletonMap("bar",
new ResultOrError<>(new ApiError(UNKNOWN_TOPIC_OR_PARTITION))),
replicationControl.findTopicIds(Long.MAX_VALUE, Collections.singleton("bar")));
ControllerResult<Map<Uuid, ApiError>> invalidDeleteResult = replicationControl.
deleteTopics(Collections.singletonList(invalidId));
assertEquals(0, invalidDeleteResult.records().size());
- assertEquals(Collections.singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
+ assertEquals(singletonMap(invalidId, new ApiError(UNKNOWN_TOPIC_ID, null)),
invalidDeleteResult.response());
ControllerResult<Map<Uuid, ApiError>> deleteResult = replicationControl.
deleteTopics(Collections.singletonList(topicId));
assertTrue(deleteResult.isAtomic());
- assertEquals(Collections.singletonMap(topicId, new ApiError(NONE, null)),
+ assertEquals(singletonMap(topicId, new ApiError(NONE, null)),
deleteResult.response());
assertEquals(1, deleteResult.records().size());
ctx.replay(deleteResult.records());
@@ -676,10 +738,10 @@ public class ReplicationControlManagerTest {
assertNull(replicationControl.getPartition(topicId, 1));
assertNull(replicationControl.getPartition(topicId, 2));
assertNull(replicationControl.getPartition(topicId, 3));
- assertEquals(Collections.singletonMap(topicId, new ResultOrError<>(
+ assertEquals(singletonMap(topicId, new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_ID))), replicationControl.findTopicNames(
Long.MAX_VALUE, Collections.singleton(topicId)));
- assertEquals(Collections.singletonMap("foo", new ResultOrError<>(
+ assertEquals(singletonMap("foo", new ResultOrError<>(
new ApiError(UNKNOWN_TOPIC_OR_PARTITION))), replicationControl.findTopicIds(
Long.MAX_VALUE, Collections.singleton("foo")));
assertEmptyTopicConfigs(ctx, "foo");
@@ -715,7 +777,7 @@ public class ReplicationControlManagerTest {
setName("quux").setCount(2).setAssignments(null));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult =
replicationControl.createPartitions(topics);
- assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
+ assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
@@ -735,20 +797,20 @@ public class ReplicationControlManagerTest {
ctx.replay(createPartitionsResult.records());
List<CreatePartitionsTopic> topics2 = new ArrayList<>();
topics2.add(new CreatePartitionsTopic().
- setName("foo").setCount(6).setAssignments(Arrays.asList(
- new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
+ setName("foo").setCount(6).setAssignments(asList(
+ new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
- setName("bar").setCount(5).setAssignments(Arrays.asList(
- new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1)))));
+ setName("bar").setCount(5).setAssignments(asList(
+ new CreatePartitionsAssignment().setBrokerIds(asList(1)))));
topics2.add(new CreatePartitionsTopic().
- setName("quux").setCount(4).setAssignments(Arrays.asList(
- new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(1, 0)))));
+ setName("quux").setCount(4).setAssignments(asList(
+ new CreatePartitionsAssignment().setBrokerIds(asList(1, 0)))));
topics2.add(new CreatePartitionsTopic().
- setName("foo2").setCount(3).setAssignments(Arrays.asList(
- new CreatePartitionsAssignment().setBrokerIds(Arrays.asList(2, 0)))));
+ setName("foo2").setCount(3).setAssignments(asList(
+ new CreatePartitionsAssignment().setBrokerIds(asList(2, 0)))));
ControllerResult<List<CreatePartitionsTopicResult>> createPartitionsResult2 =
replicationControl.createPartitions(topics2);
- assertEquals(Arrays.asList(new CreatePartitionsTopicResult().
+ assertEquals(asList(new CreatePartitionsTopicResult().
setName("foo").
setErrorCode(NONE.code()).
setErrorMessage(null),
@@ -775,13 +837,13 @@ public class ReplicationControlManagerTest {
public void testValidateGoodManualPartitionAssignments() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ctx.registerBrokers(1, 2, 3);
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1),
OptionalInt.of(1));
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1),
OptionalInt.empty());
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.of(3));
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.empty());
}
@@ -791,20 +853,20 @@ public class ReplicationControlManagerTest {
ctx.registerBrokers(1, 2);
assertEquals("The manual partition assignment includes an empty replica list.",
assertThrows(InvalidReplicaAssignmentException.class, () ->
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes broker 3, but no such " +
"broker is registered.", assertThrows(InvalidReplicaAssignmentException.class, () ->
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 3),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 3),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes the broker 2 more than " +
"once.", assertThrows(InvalidReplicaAssignmentException.class, () ->
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2, 2),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2, 2),
OptionalInt.empty())).getMessage());
assertEquals("The manual partition assignment includes a partition with 2 " +
"replica(s), but this is not consistent with previous partitions, which have " +
"3 replica(s).", assertThrows(InvalidReplicaAssignmentException.class, () ->
- ctx.replicationControl.validateManualPartitionAssignment(Arrays.asList(1, 2),
+ ctx.replicationControl.validateManualPartitionAssignment(asList(1, 2),
OptionalInt.of(3))).getMessage());
}
@@ -824,18 +886,18 @@ public class ReplicationControlManagerTest {
assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
- new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
- new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsRequestData().setTopics(asList(
+ new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
- setReplicas(Arrays.asList(3, 2, 1)),
+ setReplicas(asList(3, 2, 1)),
new ReassignablePartition().setPartitionIndex(1).
- setReplicas(Arrays.asList(0, 2, 1)),
+ setReplicas(asList(0, 2, 1)),
new ReassignablePartition().setPartitionIndex(2).
- setReplicas(Arrays.asList(0, 2, 1)))),
+ setReplicas(asList(0, 2, 1)))),
new ReassignableTopic().setName("bar"))));
assertEquals(new AlterPartitionReassignmentsResponseData().
- setErrorMessage(null).setResponses(Arrays.asList(
- new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ setErrorMessage(null).setResponses(asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -849,41 +911,41 @@ public class ReplicationControlManagerTest {
ctx.replay(alterResult.records());
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
- setTopics(Arrays.asList(new OngoingTopicReassignment().
- setName("foo").setPartitions(Arrays.asList(
+ setTopics(asList(new OngoingTopicReassignment().
+ setName("foo").setPartitions(asList(
new OngoingPartitionReassignment().setPartitionIndex(1).
- setRemovingReplicas(Arrays.asList(3)).
- setAddingReplicas(Arrays.asList(0)).
- setReplicas(Arrays.asList(0, 2, 1, 3))))));
+ setRemovingReplicas(asList(3)).
+ setAddingReplicas(asList(0)).
+ setReplicas(asList(0, 2, 1, 3))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
- assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
- setPartitionIndexes(Arrays.asList(0, 1, 2)))));
- assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+ setPartitionIndexes(asList(0, 1, 2)))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
- setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ setPartitionIndexes(asList(0, 1, 2)))));
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
- new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
- new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsRequestData().setTopics(asList(
+ new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(1).
setReplicas(null),
new ReassignablePartition().setPartitionIndex(2).
setReplicas(null))),
- new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(fooId).
setPartitionId(1).
- setReplicas(Arrays.asList(2, 1, 3)).
+ setReplicas(asList(2, 1, 3)).
setLeader(3).
setRemovingReplicas(Collections.emptyList()).
setAddingReplicas(Collections.emptyList()), (short) 0)),
- new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
- new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -891,7 +953,7 @@ public class ReplicationControlManagerTest {
new ReassignablePartitionResponse().setPartitionIndex(2).
setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
setErrorMessage("Unable to find partition foo:2."))),
- new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).
setErrorMessage(null)))))),
@@ -899,11 +961,11 @@ public class ReplicationControlManagerTest {
log.info("running final alterIsr...");
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(3).setBrokerEpoch(103).
- setTopics(Arrays.asList(new TopicData().setName("foo").setPartitions(Arrays.asList(
+ setTopics(asList(new TopicData().setName("foo").setPartitions(asList(
new PartitionData().setPartitionIndex(1).setCurrentIsrVersion(1).
- setLeaderEpoch(0).setNewIsr(Arrays.asList(3, 0, 2, 1)))))));
- assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
- new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+ setLeaderEpoch(0).setNewIsr(asList(3, 0, 2, 1)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(asList(
+ new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(1).
setErrorCode(FENCED_LEADER_EPOCH.code()))))),
@@ -931,22 +993,22 @@ public class ReplicationControlManagerTest {
new int[] {}, new int[] {}, 1, 1, 1), replication.getPartition(fooId, 0));
ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
replication.alterPartitionReassignments(
- new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
- new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsRequestData().setTopics(asList(
+ new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
- setReplicas(Arrays.asList(1, 2, 3)),
+ setReplicas(asList(1, 2, 3)),
new ReassignablePartition().setPartitionIndex(1).
- setReplicas(Arrays.asList(1, 2, 3, 0)),
+ setReplicas(asList(1, 2, 3, 0)),
new ReassignablePartition().setPartitionIndex(2).
- setReplicas(Arrays.asList(5, 6, 7)),
+ setReplicas(asList(5, 6, 7)),
new ReassignablePartition().setPartitionIndex(3).
- setReplicas(Arrays.asList()))),
- new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ setReplicas(asList()))),
+ new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
- setReplicas(Arrays.asList(1, 2, 3, 4, 0)))))));
+ setReplicas(asList(1, 2, 3, 4, 0)))))));
assertEquals(new AlterPartitionReassignmentsResponseData().
- setErrorMessage(null).setResponses(Arrays.asList(
- new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ setErrorMessage(null).setResponses(asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null),
new ReassignablePartitionResponse().setPartitionIndex(1).
@@ -959,7 +1021,7 @@ public class ReplicationControlManagerTest {
setErrorCode(INVALID_REPLICA_ASSIGNMENT.code()).
setErrorMessage("The manual partition assignment includes an empty " +
"replica list."))),
- new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null))))),
alterResult.response());
@@ -972,55 +1034,55 @@ public class ReplicationControlManagerTest {
new int[] {}, new int[] {0, 1}, 4, 1, 2), replication.getPartition(barId, 0));
ListPartitionReassignmentsResponseData currentReassigning =
new ListPartitionReassignmentsResponseData().setErrorMessage(null).
- setTopics(Arrays.asList(new OngoingTopicReassignment().
- setName("bar").setPartitions(Arrays.asList(
+ setTopics(asList(new OngoingTopicReassignment().
+ setName("bar").setPartitions(asList(
new OngoingPartitionReassignment().setPartitionIndex(0).
setRemovingReplicas(Collections.emptyList()).
- setAddingReplicas(Arrays.asList(0, 1)).
- setReplicas(Arrays.asList(1, 2, 3, 4, 0))))));
+ setAddingReplicas(asList(0, 1)).
+ setReplicas(asList(1, 2, 3, 4, 0))))));
assertEquals(currentReassigning, replication.listPartitionReassignments(null));
- assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(Arrays.asList(
+ assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("foo").
- setPartitionIndexes(Arrays.asList(0, 1, 2)))));
- assertEquals(currentReassigning, replication.listPartitionReassignments(Arrays.asList(
+ setPartitionIndexes(asList(0, 1, 2)))));
+ assertEquals(currentReassigning, replication.listPartitionReassignments(asList(
new ListPartitionReassignmentsTopics().setName("bar").
- setPartitionIndexes(Arrays.asList(0, 1, 2)))));
+ setPartitionIndexes(asList(0, 1, 2)))));
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(4).setBrokerEpoch(104).
- setTopics(Arrays.asList(new TopicData().setName("bar").setPartitions(Arrays.asList(
+ setTopics(asList(new TopicData().setName("bar").setPartitions(asList(
new PartitionData().setPartitionIndex(0).setCurrentIsrVersion(2).
- setLeaderEpoch(1).setNewIsr(Arrays.asList(4, 1, 2, 3, 0)))))));
- assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
- new AlterIsrResponseData.TopicData().setName("bar").setPartitions(Arrays.asList(
+ setLeaderEpoch(1).setNewIsr(asList(4, 1, 2, 3, 0)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(asList(
+ new AlterIsrResponseData.TopicData().setName("bar").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(4).
setLeaderEpoch(1).
- setIsr(Arrays.asList(4, 1, 2, 3, 0)).
+ setIsr(asList(4, 1, 2, 3, 0)).
setCurrentIsrVersion(3).
setErrorCode(NONE.code()))))),
alterIsrResult.response());
ControllerResult<AlterPartitionReassignmentsResponseData> cancelResult =
replication.alterPartitionReassignments(
- new AlterPartitionReassignmentsRequestData().setTopics(Arrays.asList(
- new ReassignableTopic().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsRequestData().setTopics(asList(
+ new ReassignableTopic().setName("foo").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))),
- new ReassignableTopic().setName("bar").setPartitions(Arrays.asList(
+ new ReassignableTopic().setName("bar").setPartitions(asList(
new ReassignablePartition().setPartitionIndex(0).
setReplicas(null))))));
assertEquals(ControllerResult.atomicOf(Collections.singletonList(new ApiMessageAndVersion(
new PartitionChangeRecord().setTopicId(barId).
setPartitionId(0).
setLeader(4).
- setReplicas(Arrays.asList(2, 3, 4)).
+ setReplicas(asList(2, 3, 4)).
setRemovingReplicas(null).
setAddingReplicas(Collections.emptyList()), (short) 0)),
- new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(Arrays.asList(
- new ReassignableTopicResponse().setName("foo").setPartitions(Arrays.asList(
+ new AlterPartitionReassignmentsResponseData().setErrorMessage(null).setResponses(asList(
+ new ReassignableTopicResponse().setName("foo").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorCode(NO_REASSIGNMENT_IN_PROGRESS.code()).setErrorMessage(null))),
- new ReassignableTopicResponse().setName("bar").setPartitions(Arrays.asList(
+ new ReassignableTopicResponse().setName("bar").setPartitions(asList(
new ReassignablePartitionResponse().setPartitionIndex(0).
setErrorMessage(null)))))),
cancelResult);
@@ -1052,6 +1114,160 @@ public class ReplicationControlManagerTest {
ctx.replicationControl.getPartition(fooId, 1));
}
+ private void assertLeaderAndIsr(
+ ReplicationControlManager replication,
+ TopicIdPartition topicIdPartition,
+ int leaderId,
+ int[] isr
+ ) {
+ PartitionRegistration registration = replication.getPartition(
+ topicIdPartition.topicId(),
+ topicIdPartition.partitionId()
+ );
+ assertArrayEquals(isr, registration.isr);
+ assertEquals(leaderId, registration.leader);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testElectUncleanLeaders(boolean electAllPartitions) throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(0, 1, 2, 3, 4);
+ ctx.unfenceBrokers(0, 1, 2, 3, 4);
+
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{
+ new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
+
+ TopicIdPartition partition0 = new TopicIdPartition(fooId, 0);
+ TopicIdPartition partition1 = new TopicIdPartition(fooId, 1);
+ TopicIdPartition partition2 = new TopicIdPartition(fooId, 2);
+
+ ctx.fenceBrokers(Utils.mkSet(2, 3));
+ ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+
+ assertLeaderAndIsr(replication, partition0, NO_LEADER, new int[]{1});
+ assertLeaderAndIsr(replication, partition1, 4, new int[]{4});
+ assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+ ElectLeadersRequestData request = buildElectLeadersRequest(
+ ElectionType.UNCLEAN,
+ electAllPartitions ? null : singletonMap("foo", asList(0, 1, 2))
+ );
+
+ // No election can be done yet because no replicas are available for partition 0
+ ControllerResult<ElectLeadersResponseData> result1 = replication.electLeaders(request);
+ assertEquals(Collections.emptyList(), result1.records());
+
+ ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
+ Utils.mkEntry(
+ new TopicPartition("foo", 0),
+ new ApiError(ELIGIBLE_LEADERS_NOT_AVAILABLE)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 1),
+ new ApiError(ELECTION_NOT_NEEDED)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 2),
+ new ApiError(ELECTION_NOT_NEEDED)
+ )
+ ));
+ assertElectLeadersResponse(expectedResponse1, result1.response());
+
+ // Now we bring 2 back online which should allow the unclean election of partition 0
+ ctx.unfenceBrokers(Utils.mkSet(2));
+
+ // Bring 2 back into the ISR for partition 1. This allows us to verify that
+ // preferred election does not occur as a result of the unclean election request.
+ ctx.alterIsr(partition1, 4, asList(2, 4));
+
+ ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
+ assertEquals(1, result.records().size());
+
+ ApiMessageAndVersion record = result.records().get(0);
+ assertTrue(record.message() instanceof PartitionChangeRecord);
+
+ PartitionChangeRecord partitionChangeRecord = (PartitionChangeRecord) record.message();
+ assertEquals(0, partitionChangeRecord.partitionId());
+ assertEquals(2, partitionChangeRecord.leader());
+ assertEquals(singletonList(2), partitionChangeRecord.isr());
+ ctx.replay(result.records());
+
+ assertLeaderAndIsr(replication, partition0, 2, new int[]{2});
+ assertLeaderAndIsr(replication, partition1, 4, new int[]{2, 4});
+ assertLeaderAndIsr(replication, partition2, 0, new int[]{0});
+
+ ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, electAllPartitions, Utils.mkMap(
+ Utils.mkEntry(
+ new TopicPartition("foo", 0),
+ ApiError.NONE
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 1),
+ new ApiError(ELECTION_NOT_NEEDED)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 2),
+ new ApiError(ELECTION_NOT_NEEDED)
+ )
+ ));
+ assertElectLeadersResponse(expectedResponse, result.response());
+ }
+
+ @Test
+ public void testPreferredElectionDoesNotTriggerUncleanElection() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replication = ctx.replicationControl;
+ ctx.registerBrokers(1, 2, 3, 4);
+ ctx.unfenceBrokers(1, 2, 3, 4);
+
+ Uuid fooId = ctx.createTestTopic("foo", new int[][]{new int[]{1, 2, 3}}).topicId();
+ TopicIdPartition partition = new TopicIdPartition(fooId, 0);
+
+ ctx.fenceBrokers(Utils.mkSet(2, 3));
+ ctx.fenceBrokers(Utils.mkSet(1, 2, 3));
+ ctx.unfenceBrokers(Utils.mkSet(2));
+
+ assertLeaderAndIsr(replication, partition, NO_LEADER, new int[]{1});
+
+ ctx.alterTopicConfig("foo", "unclean.leader.election.enable", "true");
+
+ ElectLeadersRequestData request = buildElectLeadersRequest(
+ ElectionType.PREFERRED,
+ singletonMap("foo", singletonList(0))
+ );
+
+ // No election should be done even though unclean election is available
+ ControllerResult<ElectLeadersResponseData> result = replication.electLeaders(request);
+ assertEquals(Collections.emptyList(), result.records());
+
+ ElectLeadersResponseData expectedResponse = buildElectLeadersResponse(NONE, false, singletonMap(
+ new TopicPartition("foo", 0), new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
+ ));
+ assertEquals(expectedResponse, result.response());
+ }
+
+ private ElectLeadersRequestData buildElectLeadersRequest(
+ ElectionType electionType,
+ Map<String, List<Integer>> partitions
+ ) {
+ ElectLeadersRequestData request = new ElectLeadersRequestData().
+ setElectionType(electionType.value);
+
+ if (partitions == null) {
+ request.setTopicPartitions(null);
+ } else {
+ partitions.forEach((topic, partitionIds) -> {
+ request.topicPartitions().add(new TopicPartitions()
+ .setTopic(topic)
+ .setPartitions(partitionIds)
+ );
+ });
+ }
+ return request;
+ }
+
@Test
public void testFenceMultipleBrokers() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
@@ -1083,7 +1299,7 @@ public class ReplicationControlManagerTest {
}
@Test
- public void testElectLeaders() throws Exception {
+ public void testElectPreferredLeaders() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ReplicationControlManager replication = ctx.replicationControl;
ctx.registerBrokers(0, 1, 2, 3, 4);
@@ -1091,60 +1307,130 @@ public class ReplicationControlManagerTest {
Uuid fooId = ctx.createTestTopic("foo", new int[][]{
new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
ElectLeadersRequestData request1 = new ElectLeadersRequestData().
- setElectionType((byte) 0).
- setTopicPartitions(new TopicPartitionsCollection(Arrays.asList(
+ setElectionType(ElectionType.PREFERRED.value).
+ setTopicPartitions(new TopicPartitionsCollection(asList(
new TopicPartitions().setTopic("foo").
- setPartitions(Arrays.asList(0, 1)),
+ setPartitions(asList(0, 1)),
new TopicPartitions().setTopic("bar").
- setPartitions(Arrays.asList(0, 1))).iterator()));
+ setPartitions(asList(0, 1))).iterator()));
ControllerResult<ElectLeadersResponseData> election1Result =
replication.electLeaders(request1);
- ElectLeadersResponseData expectedResponse1 = new ElectLeadersResponseData().
- setErrorCode((short) 0).
- setReplicaElectionResults(Arrays.asList(
- new ReplicaElectionResult().setTopic("foo").
- setPartitionResult(Arrays.asList(
- new PartitionResult().setPartitionId(0).
- setErrorCode(NONE.code()).
- setErrorMessage(null),
- new PartitionResult().setPartitionId(1).
- setErrorCode(NONE.code()).
- setErrorMessage(null))),
- new ReplicaElectionResult().setTopic("bar").
- setPartitionResult(Arrays.asList(
- new PartitionResult().setPartitionId(0).
- setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
- setErrorMessage("No such topic as bar"),
- new PartitionResult().setPartitionId(1).
- setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code()).
- setErrorMessage("No such topic as bar")))));
- assertEquals(expectedResponse1, election1Result.response());
+ ElectLeadersResponseData expectedResponse1 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
+ Utils.mkEntry(
+ new TopicPartition("foo", 0),
+ new ApiError(PREFERRED_LEADER_NOT_AVAILABLE)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 1),
+ new ApiError(ELECTION_NOT_NEEDED)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("bar", 0),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+ ),
+ Utils.mkEntry(
+ new TopicPartition("bar", 1),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+ )
+ ));
+ assertElectLeadersResponse(expectedResponse1, election1Result.response());
assertEquals(Collections.emptyList(), election1Result.records());
ctx.unfenceBrokers(0, 1);
ControllerResult<AlterIsrResponseData> alterIsrResult = replication.alterIsr(
new AlterIsrRequestData().setBrokerId(2).setBrokerEpoch(102).
- setTopics(Arrays.asList(new AlterIsrRequestData.TopicData().setName("foo").
- setPartitions(Arrays.asList(new AlterIsrRequestData.PartitionData().
+ setTopics(asList(new AlterIsrRequestData.TopicData().setName("foo").
+ setPartitions(asList(new AlterIsrRequestData.PartitionData().
setPartitionIndex(0).setCurrentIsrVersion(0).
- setLeaderEpoch(0).setNewIsr(Arrays.asList(1, 2, 3)))))));
- assertEquals(new AlterIsrResponseData().setTopics(Arrays.asList(
- new AlterIsrResponseData.TopicData().setName("foo").setPartitions(Arrays.asList(
+ setLeaderEpoch(0).setNewIsr(asList(1, 2, 3)))))));
+ assertEquals(new AlterIsrResponseData().setTopics(asList(
+ new AlterIsrResponseData.TopicData().setName("foo").setPartitions(asList(
new AlterIsrResponseData.PartitionData().
setPartitionIndex(0).
setLeaderId(2).
setLeaderEpoch(0).
- setIsr(Arrays.asList(1, 2, 3)).
+ setIsr(asList(1, 2, 3)).
setCurrentIsrVersion(1).
setErrorCode(NONE.code()))))),
alterIsrResult.response());
+
+ ElectLeadersResponseData expectedResponse2 = buildElectLeadersResponse(NONE, false, Utils.mkMap(
+ Utils.mkEntry(
+ new TopicPartition("foo", 0),
+ ApiError.NONE
+ ),
+ Utils.mkEntry(
+ new TopicPartition("foo", 1),
+ new ApiError(ELECTION_NOT_NEEDED)
+ ),
+ Utils.mkEntry(
+ new TopicPartition("bar", 0),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+ ),
+ Utils.mkEntry(
+ new TopicPartition("bar", 1),
+ new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "No such topic as bar")
+ )
+ ));
+
ctx.replay(alterIsrResult.records());
ControllerResult<ElectLeadersResponseData> election2Result =
replication.electLeaders(request1);
- assertEquals(expectedResponse1, election2Result.response());
- assertEquals(Arrays.asList(new ApiMessageAndVersion(new PartitionChangeRecord().
+ assertElectLeadersResponse(expectedResponse2, election2Result.response());
+ assertEquals(asList(new ApiMessageAndVersion(new PartitionChangeRecord().
setPartitionId(0).
setTopicId(fooId).
setLeader(1), (short) 0)), election2Result.records());
}
+
+ private void assertElectLeadersResponse(
+ ElectLeadersResponseData expected,
+ ElectLeadersResponseData actual
+ ) {
+ assertEquals(Errors.forCode(expected.errorCode()), Errors.forCode(actual.errorCode()));
+ assertEquals(collectElectLeadersErrors(expected), collectElectLeadersErrors(actual));
+ }
+
+ private Map<TopicPartition, PartitionResult> collectElectLeadersErrors(ElectLeadersResponseData response) {
+ Map<TopicPartition, PartitionResult> res = new HashMap<>();
+ response.replicaElectionResults().forEach(topicResult -> {
+ String topic = topicResult.topic();
+ topicResult.partitionResult().forEach(partitionResult -> {
+ TopicPartition topicPartition = new TopicPartition(topic, partitionResult.partitionId());
+ res.put(topicPartition, partitionResult);
+ });
+ });
+ return res;
+ }
+
+ private ElectLeadersResponseData buildElectLeadersResponse(
+ Errors topLevelError,
+ boolean electAllPartitions,
+ Map<TopicPartition, ApiError> errors
+ ) {
+ Map<String, List<Map.Entry<TopicPartition, ApiError>>> errorsByTopic = errors.entrySet().stream()
+ .collect(Collectors.groupingBy(entry -> entry.getKey().topic()));
+
+ ElectLeadersResponseData response = new ElectLeadersResponseData()
+ .setErrorCode(topLevelError.code());
+
+ errorsByTopic.forEach((topic, partitionErrors) -> {
+ ReplicaElectionResult electionResult = new ReplicaElectionResult().setTopic(topic);
+ electionResult.setPartitionResult(partitionErrors.stream()
+ .filter(entry -> !electAllPartitions || entry.getValue().error() != ELECTION_NOT_NEEDED)
+ .map(entry -> {
+ TopicPartition topicPartition = entry.getKey();
+ ApiError error = entry.getValue();
+ return new PartitionResult()
+ .setPartitionId(topicPartition.partition())
+ .setErrorCode(error.error().code())
+ .setErrorMessage(error.message());
+ })
+ .collect(Collectors.toList()));
+ response.replicaElectionResults().add(electionResult);
+ });
+
+ return response;
+ }
+
}