You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/10 20:51:22 UTC
[kafka] branch 2.8 updated: MINOR: Rename DecommissionBrokers to
UnregisterBrokers (#10084)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 88265b3 MINOR: Rename DecommissionBrokers to UnregisterBrokers (#10084)
88265b3 is described below
commit 88265b37cb68ae8f262542195927577d9ce7e814
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Wed Feb 10 12:44:47 2021 -0800
MINOR: Rename DecommissionBrokers to UnregisterBrokers (#10084)
Rename DecommissionBrokers to UnregisterBrokers. Fix an incorrect JavaDoc comment
for the Admin API. Make sure that UNREGISTER_BROKER is marked as forwardable and
not as a controller-only API (since it can received by brokers).
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 31 +++++----
.../kafka/clients/admin/KafkaAdminClient.java | 27 ++++----
...erOptions.java => UnregisterBrokerOptions.java} | 4 +-
...okerResult.java => UnregisterBrokerResult.java} | 6 +-
.../org/apache/kafka/common/protocol/ApiKeys.java | 5 +-
.../org/apache/kafka/common/protocol/Errors.java | 2 +-
.../kafka/common/requests/AbstractRequest.java | 4 +-
.../kafka/common/requests/AbstractResponse.java | 4 +-
...erRequest.java => UnregisterBrokerRequest.java} | 34 +++++-----
...Response.java => UnregisterBrokerResponse.java} | 16 ++---
...erRequest.json => UnregisterBrokerRequest.json} | 4 +-
...Response.json => UnregisterBrokerResponse.json} | 2 +-
.../kafka/clients/admin/KafkaAdminClientTest.java | 74 ++++++++++------------
.../kafka/clients/admin/MockAdminClient.java | 6 +-
.../kafka/common/requests/RequestResponseTest.java | 24 +++----
.../scala/kafka/network/RequestConvertToJson.scala | 4 +-
core/src/main/scala/kafka/server/KafkaApis.scala | 10 ++-
core/src/main/scala/kafka/tools/ClusterTool.scala | 23 ++++---
.../scala/unit/kafka/server/RequestQuotaTest.scala | 4 +-
.../scala/unit/kafka/tools/ClusterToolTest.scala | 12 ++--
20 files changed, 151 insertions(+), 145 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index c39462a..d732cd2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1486,41 +1486,46 @@ public interface Admin extends AutoCloseable {
UpdateFeaturesResult updateFeatures(Map<String, FeatureUpdate> featureUpdates, UpdateFeaturesOptions options);
/**
- * Permanently remove a broker and reassign any partitions on the broker.
+ * Unregister a broker.
* <p>
- * This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
+ * This operation does not have any effect on partition assignments. It is supported
+ * only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
*
- * @param brokerId the broker id to unregister.
+ * This is a convenience method for {@link #unregisterBroker(int, UnregisterBrokerOptions)}
*
- * <p>This is a convenience method for {@link #decommissionBroker(int, DecommissionBrokerOptions)}
+ * @param brokerId the broker id to unregister.
*
- * @return the {@link DecommissionBrokerResult} containing the result
+ * @return the {@link UnregisterBrokerResult} containing the result
*/
- default DecommissionBrokerResult decommissionBroker(int brokerId) {
- return decommissionBroker(brokerId, new DecommissionBrokerOptions());
+ @InterfaceStability.Unstable
+ default UnregisterBrokerResult unregisterBroker(int brokerId) {
+ return unregisterBroker(brokerId, new UnregisterBrokerOptions());
}
/**
- * Permanently remove a broker and reassign any partitions on the broker.
+ * Unregister a broker.
* <p>
- * This operation is supported only on self-managed Kafka clusters (i.e. brokers which do not rely on Zookeeper).
+ * This operation does not have any effect on partition assignments. It is supported
+ * only on Kafka clusters which use Raft to store metadata, rather than ZooKeeper.
*
* The following exceptions can be anticipated when calling {@code get()} on the future from the
- * returned {@link DescribeFeaturesResult}:
+ * returned {@link UnregisterBrokerResult}:
* <ul>
* <li>{@link org.apache.kafka.common.errors.TimeoutException}
* If the request timed out before the describe operation could finish.</li>
* <li>{@link org.apache.kafka.common.errors.UnsupportedVersionException}
- * If the software is too old to support decommissioning.
+ * If the software is too old to support the unregistration API, or if the
+ * cluster is not using Raft to store metadata.
* </ul>
* <p>
*
* @param brokerId the broker id to unregister.
* @param options the options to use.
*
- * @return the {@link DecommissionBrokerResult} containing the result
+ * @return the {@link UnregisterBrokerResult} containing the result
*/
- DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options);
+ @InterfaceStability.Unstable
+ UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options);
/**
* Get the metrics kept by the adminClient
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 54603a5..ab40470 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -98,7 +98,6 @@ import org.apache.kafka.common.message.CreateTopicsRequestData;
import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsRequestData.DeleteAclsFilter;
import org.apache.kafka.common.message.DeleteAclsResponseData;
@@ -147,6 +146,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteReque
import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopic;
import org.apache.kafka.common.message.OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection;
import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
import org.apache.kafka.common.message.UpdateFeaturesRequestData;
import org.apache.kafka.common.message.UpdateFeaturesResponseData.UpdatableFeatureResult;
import org.apache.kafka.common.metrics.JmxReporter;
@@ -185,8 +185,6 @@ import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DecommissionBrokerRequest;
-import org.apache.kafka.common.requests.DecommissionBrokerResponse;
import org.apache.kafka.common.requests.DeleteAclsRequest;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsRequest;
@@ -238,6 +236,8 @@ import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RenewDelegationTokenRequest;
import org.apache.kafka.common.requests.RenewDelegationTokenResponse;
+import org.apache.kafka.common.requests.UnregisterBrokerRequest;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
@@ -4608,23 +4608,23 @@ public class KafkaAdminClient extends AdminClient {
}
@Override
- public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
+ public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
- final Call call = new Call("decommissionBroker", calcDeadlineMs(now, options.timeoutMs()),
+ final Call call = new Call("unregisterBroker", calcDeadlineMs(now, options.timeoutMs()),
new LeastLoadedNodeProvider()) {
@Override
- DecommissionBrokerRequest.Builder createRequest(int timeoutMs) {
- DecommissionBrokerRequestData data =
- new DecommissionBrokerRequestData().setBrokerId(brokerId);
- return new DecommissionBrokerRequest.Builder(data);
+ UnregisterBrokerRequest.Builder createRequest(int timeoutMs) {
+ UnregisterBrokerRequestData data =
+ new UnregisterBrokerRequestData().setBrokerId(brokerId);
+ return new UnregisterBrokerRequest.Builder(data);
}
@Override
void handleResponse(AbstractResponse abstractResponse) {
- final DecommissionBrokerResponse response =
- (DecommissionBrokerResponse) abstractResponse;
+ final UnregisterBrokerResponse response =
+ (UnregisterBrokerResponse) abstractResponse;
Errors error = Errors.forCode(response.data().errorCode());
switch (error) {
case NONE:
@@ -4633,7 +4633,8 @@ public class KafkaAdminClient extends AdminClient {
case REQUEST_TIMED_OUT:
throw error.exception();
default:
- log.error("Decommission Broker request for broker ID {} failed: {}", brokerId, error.message());
+ log.error("Unregister broker request for broker ID {} failed: {}",
+ brokerId, error.message());
future.completeExceptionally(error.exception());
break;
}
@@ -4645,7 +4646,7 @@ public class KafkaAdminClient extends AdminClient {
}
};
runnable.call(call, now);
- return new DecommissionBrokerResult(future);
+ return new UnregisterBrokerResult(future);
}
/**
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
similarity index 85%
rename from clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
index ed0d698..1935b79 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerOptions.java
@@ -20,10 +20,10 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.annotation.InterfaceStability;
/**
- * Options for {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)}.
+ * Options for {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)}.
*
* The API of this class is evolving. See {@link Admin} for details.
*/
@InterfaceStability.Evolving
-public class DecommissionBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
+public class UnregisterBrokerOptions extends AbstractOptions<UpdateFeaturesOptions> {
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
similarity index 86%
rename from clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java
rename to clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
index b5e2f52..b44c7e0 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DecommissionBrokerResult.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/UnregisterBrokerResult.java
@@ -20,14 +20,14 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.KafkaFuture;
/**
- * The result of the {@link Admin#decommissionBroker(int, DecommissionBrokerOptions)} call.
+ * The result of the {@link Admin#unregisterBroker(int, UnregisterBrokerOptions)} call.
*
* The API of this class is evolving, see {@link Admin} for details.
*/
-public class DecommissionBrokerResult {
+public class UnregisterBrokerResult {
private final KafkaFuture<Void> future;
- DecommissionBrokerResult(final KafkaFuture<Void> future) {
+ UnregisterBrokerResult(final KafkaFuture<Void> future) {
this.future = future;
}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
index 9b8ef7e..49a6130 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java
@@ -102,10 +102,7 @@ public enum ApiKeys {
DESCRIBE_PRODUCERS(ApiMessageType.DESCRIBE_PRODUCERS),
BROKER_REGISTRATION(ApiMessageType.BROKER_REGISTRATION, true, RecordBatch.MAGIC_VALUE_V0, false, true),
BROKER_HEARTBEAT(ApiMessageType.BROKER_HEARTBEAT, true, RecordBatch.MAGIC_VALUE_V0, false, true),
-
- // Once we have the controller integration for supporting broker decommissioning, we will support forwarding from the broker
- // This is a short-term workaround to avoid advertizing the API on Zookeeper-based brokers
- DECOMMISSION_BROKER(ApiMessageType.DECOMMISSION_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, true);
+ UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true, false);
// The generator ensures every `ApiMessageType` has a unique id
private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values())
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 00a3c65..03c1248 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -354,7 +354,7 @@ public enum Errors {
"Requested position is not greater than or equal to zero, and less than the size of the snapshot.",
PositionOutOfRangeException::new),
UNKNOWN_TOPIC_ID(100, "This server does not host this topic ID.", UnknownTopicIdException::new),
- DUPLICATE_BROKER_REGISTRATION_EXCEPTION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
+ DUPLICATE_BROKER_REGISTRATION(101, "This broker ID is already in use.", DuplicateBrokerRegistrationException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
index 2e4cf30..64befeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
@@ -282,8 +282,8 @@ public abstract class AbstractRequest implements AbstractRequestResponse {
return BrokerRegistrationRequest.parse(buffer, apiVersion);
case BROKER_HEARTBEAT:
return BrokerHeartbeatRequest.parse(buffer, apiVersion);
- case DECOMMISSION_BROKER:
- return DecommissionBrokerRequest.parse(buffer, apiVersion);
+ case UNREGISTER_BROKER:
+ return UnregisterBrokerRequest.parse(buffer, apiVersion);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseRequest`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
index 410d1cd..c4dd7d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java
@@ -239,8 +239,8 @@ public abstract class AbstractResponse implements AbstractRequestResponse {
return BrokerRegistrationResponse.parse(responseBuffer, version);
case BROKER_HEARTBEAT:
return BrokerHeartbeatResponse.parse(responseBuffer, version);
- case DECOMMISSION_BROKER:
- return DecommissionBrokerResponse.parse(responseBuffer, version);
+ case UNREGISTER_BROKER:
+ return UnregisterBrokerResponse.parse(responseBuffer, version);
default:
throw new AssertionError(String.format("ApiKey %s is not currently handled in `parseResponse`, the " +
"code should be updated to do so.", apiKey));
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
similarity index 55%
rename from clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java
rename to clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
index cf4bb4d..253499f 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerRequest.java
@@ -16,52 +16,52 @@
*/
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
import java.nio.ByteBuffer;
-public class DecommissionBrokerRequest extends AbstractRequest {
+public class UnregisterBrokerRequest extends AbstractRequest {
- public static class Builder extends AbstractRequest.Builder<DecommissionBrokerRequest> {
- private final DecommissionBrokerRequestData data;
+ public static class Builder extends AbstractRequest.Builder<UnregisterBrokerRequest> {
+ private final UnregisterBrokerRequestData data;
- public Builder(DecommissionBrokerRequestData data) {
- super(ApiKeys.DECOMMISSION_BROKER);
+ public Builder(UnregisterBrokerRequestData data) {
+ super(ApiKeys.UNREGISTER_BROKER);
this.data = data;
}
@Override
- public DecommissionBrokerRequest build(short version) {
- return new DecommissionBrokerRequest(data, version);
+ public UnregisterBrokerRequest build(short version) {
+ return new UnregisterBrokerRequest(data, version);
}
}
- private final DecommissionBrokerRequestData data;
+ private final UnregisterBrokerRequestData data;
- public DecommissionBrokerRequest(DecommissionBrokerRequestData data, short version) {
- super(ApiKeys.DECOMMISSION_BROKER, version);
+ public UnregisterBrokerRequest(UnregisterBrokerRequestData data, short version) {
+ super(ApiKeys.UNREGISTER_BROKER, version);
this.data = data;
}
@Override
- public DecommissionBrokerRequestData data() {
+ public UnregisterBrokerRequestData data() {
return data;
}
@Override
- public DecommissionBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
+ public UnregisterBrokerResponse getErrorResponse(int throttleTimeMs, Throwable e) {
Errors error = Errors.forException(e);
- return new DecommissionBrokerResponse(new DecommissionBrokerResponseData()
+ return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setThrottleTimeMs(throttleTimeMs)
.setErrorCode(error.code()));
}
- public static DecommissionBrokerRequest parse(ByteBuffer buffer, short version) {
- return new DecommissionBrokerRequest(new DecommissionBrokerRequestData(new ByteBufferAccessor(buffer), version),
+ public static UnregisterBrokerRequest parse(ByteBuffer buffer, short version) {
+ return new UnregisterBrokerRequest(new UnregisterBrokerRequestData(new ByteBufferAccessor(buffer), version),
version);
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
similarity index 73%
rename from clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java
rename to clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
index 3e08cbd..b508ac3 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DecommissionBrokerResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/UnregisterBrokerResponse.java
@@ -17,7 +17,7 @@
package org.apache.kafka.common.requests;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ByteBufferAccessor;
import org.apache.kafka.common.protocol.Errors;
@@ -26,16 +26,16 @@ import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
-public class DecommissionBrokerResponse extends AbstractResponse {
- private final DecommissionBrokerResponseData data;
+public class UnregisterBrokerResponse extends AbstractResponse {
+ private final UnregisterBrokerResponseData data;
- public DecommissionBrokerResponse(DecommissionBrokerResponseData data) {
- super(ApiKeys.DECOMMISSION_BROKER);
+ public UnregisterBrokerResponse(UnregisterBrokerResponseData data) {
+ super(ApiKeys.UNREGISTER_BROKER);
this.data = data;
}
@Override
- public DecommissionBrokerResponseData data() {
+ public UnregisterBrokerResponseData data() {
return data;
}
@@ -53,8 +53,8 @@ public class DecommissionBrokerResponse extends AbstractResponse {
return errorCounts;
}
- public static DecommissionBrokerResponse parse(ByteBuffer buffer, short version) {
- return new DecommissionBrokerResponse(new DecommissionBrokerResponseData(new ByteBufferAccessor(buffer), version));
+ public static UnregisterBrokerResponse parse(ByteBuffer buffer, short version) {
+ return new UnregisterBrokerResponse(new UnregisterBrokerResponseData(new ByteBufferAccessor(buffer), version));
}
@Override
diff --git a/clients/src/main/resources/common/message/DecommissionBrokerRequest.json b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
similarity index 91%
rename from clients/src/main/resources/common/message/DecommissionBrokerRequest.json
rename to clients/src/main/resources/common/message/UnregisterBrokerRequest.json
index fcf9848..3c43b16 100644
--- a/clients/src/main/resources/common/message/DecommissionBrokerRequest.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerRequest.json
@@ -16,11 +16,11 @@
{
"apiKey": 64,
"type": "request",
- "name": "DecommissionBrokerRequest",
+ "name": "UnregisterBrokerRequest",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
{ "name": "BrokerId", "type": "int32", "versions": "0+",
- "about": "The broker ID to decommission" }
+ "about": "The broker ID to unregister." }
]
}
diff --git a/clients/src/main/resources/common/message/DecommissionBrokerResponse.json b/clients/src/main/resources/common/message/UnregisterBrokerResponse.json
similarity index 97%
rename from clients/src/main/resources/common/message/DecommissionBrokerResponse.json
rename to clients/src/main/resources/common/message/UnregisterBrokerResponse.json
index 82afa2e..3a11c1a 100644
--- a/clients/src/main/resources/common/message/DecommissionBrokerResponse.json
+++ b/clients/src/main/resources/common/message/UnregisterBrokerResponse.json
@@ -16,7 +16,7 @@
{
"apiKey": 64,
"type": "response",
- "name": "DecommissionBrokerResponse",
+ "name": "UnregisterBrokerResponse",
"validVersions": "0",
"flexibleVersions": "0+",
"fields": [
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 7db77b1..a5296da 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -82,7 +82,6 @@ import org.apache.kafka.common.message.CreatePartitionsResponseData.CreatePartit
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResultCollection;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData.DeletableGroupResult;
@@ -119,6 +118,7 @@ import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResp
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopic;
import org.apache.kafka.common.message.OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
@@ -139,7 +139,6 @@ import org.apache.kafka.common.requests.CreatePartitionsRequest;
import org.apache.kafka.common.requests.CreatePartitionsResponse;
import org.apache.kafka.common.requests.CreateTopicsRequest;
import org.apache.kafka.common.requests.CreateTopicsResponse;
-import org.apache.kafka.common.requests.DecommissionBrokerResponse;
import org.apache.kafka.common.requests.DeleteAclsResponse;
import org.apache.kafka.common.requests.DeleteGroupsResponse;
import org.apache.kafka.common.requests.DeleteRecordsResponse;
@@ -168,6 +167,7 @@ import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetDeleteResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
+import org.apache.kafka.common.requests.UnregisterBrokerResponse;
import org.apache.kafka.common.requests.UpdateFeaturesRequest;
import org.apache.kafka.common.requests.UpdateFeaturesResponse;
import org.apache.kafka.common.resource.PatternType;
@@ -5172,15 +5172,13 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerSuccess() throws InterruptedException, ExecutionException {
- int decommissionedBrokerNode = 1;
+ public void testUnregisterBrokerSuccess() throws InterruptedException, ExecutionException {
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE, 0));
-
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
-
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.NONE, 0));
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
// Validate response
assertNotNull(result.all());
result.all().get();
@@ -5188,15 +5186,13 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerFailure() {
- int decommissionedBrokerNode = 1;
+ public void testUnregisterBrokerFailure() {
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
-
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
-
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
// Validate response
assertNotNull(result.all());
TestUtils.assertFutureThrows(result.all(), Errors.UNKNOWN_SERVER_ERROR.exception().getClass());
@@ -5204,15 +5200,15 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerTimeoutAndSuccessRetry() throws ExecutionException, InterruptedException {
- int decommissionedBrokerNode = 1;
+ public void testUnregisterBrokerTimeoutAndSuccessRetry() throws ExecutionException, InterruptedException {
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.NONE, 0));
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.NONE, 0));
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
// Validate response
assertNotNull(result.all());
@@ -5221,15 +5217,15 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDecommissionBrokerTimeoutAndFailureRetry() {
- int decommissionedBrokerNode = 1;
+ public void testUnregisterBrokerTimeoutAndFailureRetry() {
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.UNKNOWN_SERVER_ERROR, 0));
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
// Validate response
assertNotNull(result.all());
@@ -5239,14 +5235,14 @@ public class KafkaAdminClientTest {
@Test
public void testDecommissionBrokerTimeoutMaxRetry() {
- int decommissionedBrokerNode = 1;
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
- env.kafkaClient().prepareResponse(prepareDecommissionBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
+ env.kafkaClient().prepareResponse(prepareUnregisterBrokerResponse(Errors.REQUEST_TIMED_OUT, 0));
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode);
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId);
// Validate response
assertNotNull(result.all());
@@ -5256,14 +5252,14 @@ public class KafkaAdminClientTest {
@Test
public void testDecommissionBrokerTimeoutMaxWait() {
- int decommissionedBrokerNode = 1;
+ int nodeId = 1;
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
env.kafkaClient().setNodeApiVersions(
- NodeApiVersions.create(ApiKeys.DECOMMISSION_BROKER.id, (short) 0, (short) 0));
+ NodeApiVersions.create(ApiKeys.UNREGISTER_BROKER.id, (short) 0, (short) 0));
- DecommissionBrokerOptions options = new DecommissionBrokerOptions();
+ UnregisterBrokerOptions options = new UnregisterBrokerOptions();
options.timeoutMs = 10;
- DecommissionBrokerResult result = env.adminClient().decommissionBroker(decommissionedBrokerNode, options);
+ UnregisterBrokerResult result = env.adminClient().unregisterBroker(nodeId, options);
// Validate response
assertNotNull(result.all());
@@ -5271,8 +5267,8 @@ public class KafkaAdminClientTest {
}
}
- private DecommissionBrokerResponse prepareDecommissionBrokerResponse(Errors error, int throttleTimeMs) {
- return new DecommissionBrokerResponse(new DecommissionBrokerResponseData()
+ private UnregisterBrokerResponse prepareUnregisterBrokerResponse(Errors error, int throttleTimeMs) {
+ return new UnregisterBrokerResponse(new UnregisterBrokerResponseData()
.setErrorCode(error.code())
.setErrorMessage(error.message())
.setThrottleTimeMs(throttleTimeMs));
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
index c647e9f..c2b9cff 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java
@@ -899,13 +899,13 @@ public class MockAdminClient extends AdminClient {
}
@Override
- public DecommissionBrokerResult decommissionBroker(int brokerId, DecommissionBrokerOptions options) {
+ public UnregisterBrokerResult unregisterBroker(int brokerId, UnregisterBrokerOptions options) {
if (usingRaftController) {
- return new DecommissionBrokerResult(KafkaFuture.completedFuture(null));
+ return new UnregisterBrokerResult(KafkaFuture.completedFuture(null));
} else {
KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
future.completeExceptionally(new UnsupportedVersionException(""));
- return new DecommissionBrokerResult(future);
+ return new UnregisterBrokerResult(future);
}
}
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 26399c4..f87d9f9 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -74,8 +74,6 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicCo
import org.apache.kafka.common.message.CreateTopicsResponseData;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicConfigs;
import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
-import org.apache.kafka.common.message.DecommissionBrokerRequestData;
-import org.apache.kafka.common.message.DecommissionBrokerResponseData;
import org.apache.kafka.common.message.DeleteAclsRequestData;
import org.apache.kafka.common.message.DeleteAclsResponseData;
import org.apache.kafka.common.message.DeleteGroupsRequestData;
@@ -166,6 +164,8 @@ import org.apache.kafka.common.message.StopReplicaResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupRequestData.SyncGroupRequestAssignment;
import org.apache.kafka.common.message.SyncGroupResponseData;
+import org.apache.kafka.common.message.UnregisterBrokerRequestData;
+import org.apache.kafka.common.message.UnregisterBrokerResponseData;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataEndpoint;
import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState;
@@ -558,11 +558,11 @@ public class RequestResponseTest {
}
@Test
- public void testDecommissionBrokerSerialization() {
- for (short v = ApiKeys.DECOMMISSION_BROKER.oldestVersion(); v <= ApiKeys.DECOMMISSION_BROKER.latestVersion(); v++) {
- checkRequest(createDecommissionBrokerRequest(v), true);
- checkErrorResponse(createDecommissionBrokerRequest(v), unknownServerException, true);
- checkResponse(createDecommissionBrokerResponse(), v, true);
+ public void testUnregisterBrokerSerialization() {
+ for (short v = ApiKeys.UNREGISTER_BROKER.oldestVersion(); v <= ApiKeys.UNREGISTER_BROKER.latestVersion(); v++) {
+ checkRequest(createUnregisterBrokerRequest(v), true);
+ checkErrorResponse(createUnregisterBrokerRequest(v), unknownServerException, true);
+ checkResponse(createUnregisterBrokerResponse(), v, true);
}
}
@@ -2680,13 +2680,13 @@ public class RequestResponseTest {
return new BrokerRegistrationResponse(data);
}
- private DecommissionBrokerRequest createDecommissionBrokerRequest(short version) {
- DecommissionBrokerRequestData data = new DecommissionBrokerRequestData().setBrokerId(1);
- return new DecommissionBrokerRequest.Builder(data).build(version);
+ private UnregisterBrokerRequest createUnregisterBrokerRequest(short version) {
+ UnregisterBrokerRequestData data = new UnregisterBrokerRequestData().setBrokerId(1);
+ return new UnregisterBrokerRequest.Builder(data).build(version);
}
- private DecommissionBrokerResponse createDecommissionBrokerResponse() {
- return new DecommissionBrokerResponse(new DecommissionBrokerResponseData());
+ private UnregisterBrokerResponse createUnregisterBrokerResponse() {
+ return new UnregisterBrokerResponse(new UnregisterBrokerResponseData());
}
/**
diff --git a/core/src/main/scala/kafka/network/RequestConvertToJson.scala b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
index 814d7ee..66ece99 100644
--- a/core/src/main/scala/kafka/network/RequestConvertToJson.scala
+++ b/core/src/main/scala/kafka/network/RequestConvertToJson.scala
@@ -44,7 +44,6 @@ object RequestConvertToJson {
case req: CreateDelegationTokenRequest => CreateDelegationTokenRequestDataJsonConverter.write(req.data, request.version)
case req: CreatePartitionsRequest => CreatePartitionsRequestDataJsonConverter.write(req.data, request.version)
case req: CreateTopicsRequest => CreateTopicsRequestDataJsonConverter.write(req.data, request.version)
- case req: DecommissionBrokerRequest => DecommissionBrokerRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteAclsRequest => DeleteAclsRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteGroupsRequest => DeleteGroupsRequestDataJsonConverter.write(req.data, request.version)
case req: DeleteRecordsRequest => DeleteRecordsRequestDataJsonConverter.write(req.data, request.version)
@@ -85,6 +84,7 @@ object RequestConvertToJson {
case req: StopReplicaRequest => StopReplicaRequestDataJsonConverter.write(req.data, request.version)
case req: SyncGroupRequest => SyncGroupRequestDataJsonConverter.write(req.data, request.version)
case req: TxnOffsetCommitRequest => TxnOffsetCommitRequestDataJsonConverter.write(req.data, request.version)
+ case req: UnregisterBrokerRequest => UnregisterBrokerRequestDataJsonConverter.write(req.data, request.version)
case req: UpdateFeaturesRequest => UpdateFeaturesRequestDataJsonConverter.write(req.data, request.version)
case req: UpdateMetadataRequest => UpdateMetadataRequestDataJsonConverter.write(req.data, request.version)
case req: VoteRequest => VoteRequestDataJsonConverter.write(req.data, request.version)
@@ -116,7 +116,6 @@ object RequestConvertToJson {
case res: CreateDelegationTokenResponse => CreateDelegationTokenResponseDataJsonConverter.write(res.data, version)
case res: CreatePartitionsResponse => CreatePartitionsResponseDataJsonConverter.write(res.data, version)
case res: CreateTopicsResponse => CreateTopicsResponseDataJsonConverter.write(res.data, version)
- case res: DecommissionBrokerResponse => DecommissionBrokerResponseDataJsonConverter.write(res.data, version)
case res: DeleteAclsResponse => DeleteAclsResponseDataJsonConverter.write(res.data, version)
case res: DeleteGroupsResponse => DeleteGroupsResponseDataJsonConverter.write(res.data, version)
case res: DeleteRecordsResponse => DeleteRecordsResponseDataJsonConverter.write(res.data, version)
@@ -157,6 +156,7 @@ object RequestConvertToJson {
case res: StopReplicaResponse => StopReplicaResponseDataJsonConverter.write(res.data, version)
case res: SyncGroupResponse => SyncGroupResponseDataJsonConverter.write(res.data, version)
case res: TxnOffsetCommitResponse => TxnOffsetCommitResponseDataJsonConverter.write(res.data, version)
+ case res: UnregisterBrokerResponse => UnregisterBrokerResponseDataJsonConverter.write(res.data, version)
case res: UpdateFeaturesResponse => UpdateFeaturesResponseDataJsonConverter.write(res.data, version)
case res: UpdateMetadataResponse => UpdateMetadataResponseDataJsonConverter.write(res.data, version)
case res: WriteTxnMarkersResponse => WriteTxnMarkersResponseDataJsonConverter.write(res.data, version)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index 938c401..c3195be 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -217,6 +217,7 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.ENVELOPE => handleEnvelope(request)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
+ case ApiKeys.UNREGISTER_BROKER => maybeForwardToController(request, handleUnregisterBrokerRequest)
// Handle requests that should have been sent to the KIP-500 controller.
// Until we are ready to integrate the Raft layer, these APIs are treated as
@@ -228,7 +229,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.FETCH_SNAPSHOT => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.BROKER_REGISTRATION => requestHelper.closeConnection(request, util.Collections.emptyMap())
case ApiKeys.BROKER_HEARTBEAT => requestHelper.closeConnection(request, util.Collections.emptyMap())
- case ApiKeys.DECOMMISSION_BROKER => requestHelper.closeConnection(request, util.Collections.emptyMap())
}
} catch {
case e: FatalExitError => throw e
@@ -3390,6 +3390,14 @@ class KafkaApis(val requestChannel: RequestChannel,
new DescribeProducersResponse(response.setThrottleTimeMs(requestThrottleMs)))
}
+ def handleUnregisterBrokerRequest(request: RequestChannel.Request): Unit = {
+ // This function will not be called when in self-managed quorum mode, since the
+ // UNREGISTER_BROKER API is marked as forwardable and we will always have a forwarding
+ // manager.
+ throw new UnsupportedVersionException("The broker unregistration API is not available when using " +
+ "Apache ZooKeeper mode.")
+ }
+
private def updateRecordConversionStats(request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit = {
diff --git a/core/src/main/scala/kafka/tools/ClusterTool.scala b/core/src/main/scala/kafka/tools/ClusterTool.scala
index f0d3d90..a5e2a82 100644
--- a/core/src/main/scala/kafka/tools/ClusterTool.scala
+++ b/core/src/main/scala/kafka/tools/ClusterTool.scala
@@ -39,9 +39,9 @@ object ClusterTool extends Logging {
val clusterIdParser = subparsers.addParser("cluster-id").
help("Get information about the ID of a cluster.")
- val decommissionParser = subparsers.addParser("decommission").
- help("Decommission a broker..")
- List(clusterIdParser, decommissionParser).foreach(parser => {
+ val unregisterParser = subparsers.addParser("unregister").
+ help("Unregister a broker..")
+ List(clusterIdParser, unregisterParser).foreach(parser => {
parser.addArgument("--bootstrap-server", "-b").
action(store()).
help("A list of host/port pairs to use for establishing the connection to the kafka cluster.")
@@ -49,10 +49,10 @@ object ClusterTool extends Logging {
action(store()).
help("A property file containing configs to passed to AdminClient.")
})
- decommissionParser.addArgument("--id", "-i").
+ unregisterParser.addArgument("--id", "-i").
`type`(classOf[Integer]).
action(store()).
- help("The ID of the broker to decommission.")
+ help("The ID of the broker to unregister.")
val namespace = parser.parseArgsOrFail(args)
val command = namespace.getString("command")
@@ -77,10 +77,10 @@ object ClusterTool extends Logging {
adminClient.close()
}
Exit.exit(0)
- case "decommission" =>
+ case "unregister" =>
val adminClient = Admin.create(properties)
try {
- decommissionCommand(System.out, adminClient, namespace.getInt("id"))
+ unregisterCommand(System.out, adminClient, namespace.getInt("id"))
} finally {
adminClient.close()
}
@@ -104,18 +104,17 @@ object ClusterTool extends Logging {
}
}
- def decommissionCommand(stream: PrintStream,
+ def unregisterCommand(stream: PrintStream,
adminClient: Admin,
id: Int): Unit = {
try {
- Option(adminClient.decommissionBroker(id).all().get())
- stream.println(s"Broker ${id} is no longer registered. Note that if the broker " +
- "is still running, or is restarted, it will re-register.")
+ Option(adminClient.unregisterBroker(id).all().get())
+ stream.println(s"Broker ${id} is no longer registered.")
} catch {
case e: ExecutionException => {
val cause = e.getCause()
if (cause.isInstanceOf[UnsupportedVersionException]) {
- stream.println(s"The target cluster does not support broker decommissioning.")
+ stream.println(s"The target cluster does not support the broker unregistration API.")
} else {
throw e
}
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index d326dba..5b1363f 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -632,8 +632,8 @@ class RequestQuotaTest extends BaseRequestTest {
case ApiKeys.BROKER_HEARTBEAT =>
new BrokerHeartbeatRequest.Builder(new BrokerHeartbeatRequestData())
- case ApiKeys.DECOMMISSION_BROKER =>
- new DecommissionBrokerRequest.Builder(new DecommissionBrokerRequestData())
+ case ApiKeys.UNREGISTER_BROKER =>
+ new UnregisterBrokerRequest.Builder(new UnregisterBrokerRequestData())
case _ =>
throw new IllegalArgumentException("Unsupported API key " + apiKey)
diff --git a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
index 0ce100c..b98cd8e 100644
--- a/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/ClusterToolTest.scala
@@ -49,26 +49,26 @@ class ClusterToolTest {
}
@Test
- def testDecommissionBroker(): Unit = {
+ def testUnregisterBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(true).
build()
val stream = new ByteArrayOutputStream()
- ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+ ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
- s"""Broker 0 is no longer registered. Note that if the broker is still running, or is restarted, it will re-register.
+ s"""Broker 0 is no longer registered.
""", stream.toString())
}
@Test
- def testLegacyModeClusterCannotDecommissionBroker(): Unit = {
+ def testLegacyModeClusterCannotUnregisterBroker(): Unit = {
val adminClient = new MockAdminClient.Builder().numBrokers(3).
usingRaftController(false).
build()
val stream = new ByteArrayOutputStream()
- ClusterTool.decommissionCommand(new PrintStream(stream), adminClient, 0)
+ ClusterTool.unregisterCommand(new PrintStream(stream), adminClient, 0)
assertEquals(
- s"""The target cluster does not support broker decommissioning.
+ s"""The target cluster does not support the broker unregistration API.
""", stream.toString())
}
}