You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/11/03 01:19:18 UTC
[kafka] branch trunk updated: MINOR: KIP-584: Remove admin client
facility to read features from controller (#9536)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b9e2a89 MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)
b9e2a89 is described below
commit b9e2a89c0f48f8fa0849b173a2846d3689201dd6
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Mon Nov 2 17:16:52 2020 -0800
MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)
In this PR, I have eliminated the facility in Admin#describeFeatures API and it's implementation to be able to optionally send a describeFeatures request to the controller. This feature was not seen to be particularly useful, and besides it also poses some hindrance to post KIP-500 world where no client would be able to access the controller directly.
Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Jun Rao <ju...@gmail.com>
---
.../java/org/apache/kafka/clients/admin/Admin.java | 8 ++----
.../clients/admin/DescribeFeaturesOptions.java | 20 --------------
.../kafka/clients/admin/KafkaAdminClient.java | 9 ++----
.../kafka/clients/admin/KafkaAdminClientTest.java | 22 +++++----------
.../main/scala/kafka/admin/FeatureCommand.scala | 32 +++++-----------------
.../unit/kafka/admin/FeatureCommandTest.scala | 2 +-
.../unit/kafka/server/UpdateFeaturesTest.scala | 5 ++--
7 files changed, 22 insertions(+), 76 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index f3bd8b6..90e9b0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1381,10 +1381,8 @@ public interface Admin extends AutoCloseable {
}
/**
- * Describes finalized as well as supported features. By default, the request is issued to any
- * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
- * parameter. This is particularly useful if the user requires strongly consistent reads of
- * finalized features.
+ * Describes finalized as well as supported features. The request is issued to any random
+ * broker.
* <p>
* The following exceptions can be anticipated when calling {@code get()} on the future from the
* returned {@link DescribeFeaturesResult}:
@@ -1435,7 +1433,7 @@ public interface Admin extends AutoCloseable {
* This means there was an unexpected error encountered when the update was applied on
* the controller. There is no guarantee on whether the update succeeded or failed. The best
* way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
- * request to the controller to get the latest features.</li>
+ * request.</li>
* </ul>
* <p>
* This operation is supported by brokers with version 2.7.0 or higher.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
index 4a37956..a51ca74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
@@ -25,24 +25,4 @@ import org.apache.kafka.common.annotation.InterfaceStability;
*/
@InterfaceStability.Evolving
public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
-
- /**
- * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
- * issued only to the controller.
- * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
- * issued to any random broker.
- */
- private boolean sendRequestToController = false;
-
- /**
- * Sets a flag indicating that the describe features request must be issued only to the controller.
- */
- public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController) {
- this.sendRequestToController = sendRequestToController;
- return this;
- }
-
- public boolean sendRequestToController() {
- return sendRequestToController;
- }
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 96cf086..469e4f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4350,14 +4350,12 @@ public class KafkaAdminClient extends AdminClient {
.hi(password, salt, iterations);
}
+ @Override
public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
final long now = time.milliseconds();
- final NodeProvider provider =
- options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider();
-
final Call call = new Call(
- "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) {
+ "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
@@ -4390,9 +4388,6 @@ public class KafkaAdminClient extends AdminClient {
final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
future.complete(createFeatureMetadata(apiVersionsResponse));
- } else if (options.sendRequestToController() &&
- apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
- handleNotControllerError(Errors.NOT_CONTROLLER);
} else {
future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a089aa8..1d2dd4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4096,24 +4096,16 @@ public class KafkaAdminClientTest {
}
@Test
- public void testDescribeFeaturesHandleNotControllerException() throws Exception {
+ public void testDescribeFeaturesFailure() {
try (final AdminClientUnitTestEnv env = mockClientEnv()) {
- env.kafkaClient().prepareResponseFrom(
- prepareApiVersionsResponseForDescribeFeatures(Errors.NOT_CONTROLLER),
- env.cluster().nodeById(0));
- env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
- env.cluster().clusterResource().clusterId(),
- 1,
- Collections.emptyList()));
- env.kafkaClient().prepareResponseFrom(
- prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
- env.cluster().nodeById(1));
+ env.kafkaClient().prepareResponse(
+ body -> body instanceof ApiVersionsRequest,
+ prepareApiVersionsResponseForDescribeFeatures(Errors.INVALID_REQUEST));
final DescribeFeaturesOptions options = new DescribeFeaturesOptions();
- options.sendRequestToController(true);
options.timeoutMs(10000);
- final KafkaFuture<FeatureMetadata> future
- = env.adminClient().describeFeatures(options).featureMetadata();
- future.get();
+ final KafkaFuture<FeatureMetadata> future = env.adminClient().describeFeatures(options).featureMetadata();
+ final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
+ assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());
}
}
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
index 9cc0a10..aa2b93e 100644
--- a/core/src/main/scala/kafka/admin/FeatureCommand.scala
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
import kafka.server.BrokerFeatures
import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
import org.apache.kafka.common.utils.Utils
import java.util.Properties
@@ -98,18 +98,12 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
opts = newOpts
}
- private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
- val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
- adminClient.describeFeatures(options).featureMetadata().get()
- }
-
/**
- * Describes the supported and finalized features. If the --from-controller CLI option
- * is provided, then the request is issued only to the controller, otherwise the request is issued
- * to any of the provided bootstrap servers.
+ * Describes the supported and finalized features. The request is issued to any of the provided
+ * bootstrap servers.
*/
def describeFeatures(): Unit = {
- val result = describeFeatures(opts.hasFromControllerOption)
+ val result = adminClient.describeFeatures.featureMetadata.get
val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
features.toList.sorted.foreach {
@@ -163,7 +157,7 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def upgradeAllFeatures(): Unit = {
- val metadata = describeFeatures(true)
+ val metadata = adminClient.describeFeatures.featureMetadata.get
val existingFinalizedFeatures = metadata.finalizedFeatures
val updates = supportedFeatures.features.asScala.map {
case (feature, targetVersionRange) =>
@@ -210,7 +204,7 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
* @throws UpdateFeaturesException if at least one of the feature updates failed
*/
def downgradeAllFeatures(): Unit = {
- val metadata = describeFeatures(true)
+ val metadata = adminClient.describeFeatures.featureMetadata.get
val existingFinalizedFeatures = metadata.finalizedFeatures
val supportedFeaturesMap = supportedFeatures.features
val updates = existingFinalizedFeatures.asScala.map {
@@ -331,12 +325,7 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
.ofType(classOf[String])
private val describeOpt = parser.accepts(
"describe",
- "Describe supported and finalized features. By default, the features are described from a" +
- " random broker. The request can be optionally directed only to the controller using the" +
- " --from-controller option.")
- private val fromControllerOpt = parser.accepts(
- "from-controller",
- "Describe supported and finalized features from the controller.")
+ "Describe supported and finalized features from a random broker.")
private val upgradeAllOpt = parser.accepts(
"upgrade-all",
"Upgrades all finalized features to the maximum version levels known to the tool." +
@@ -359,8 +348,6 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
def hasDescribeOption: Boolean = has(describeOpt)
- def hasFromControllerOption: Boolean = has(fromControllerOpt)
-
def hasDryRunOption: Boolean = has(dryRunOpt)
def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
@@ -390,11 +377,6 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
parser,
"Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
}
- if (hasFromControllerOption && !hasDescribeOption) {
- CommandLineUtils.printUsageAndDie(
- parser,
- "Command can contain --from-controller option only when --describe action is provided.")
- }
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 0b9f80d..f548af6 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -76,7 +76,7 @@ class FeatureCommandTest extends BaseRequestTest {
@Test
def testDescribeFeaturesSuccess(): Unit = {
updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
- val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller")))
+ val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe")))
featureApis.setSupportedFeatures(defaultSupportedFeatures)
try {
val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
index a16ff30..f009909 100644
--- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
@@ -24,7 +24,7 @@ import kafka.api.KAFKA_2_7_IV0
import kafka.utils.TestUtils
import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
import kafka.utils.TestUtils.waitUntilTrue
-import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
import org.apache.kafka.common.errors.InvalidRequestException
import org.apache.kafka.common.feature.FinalizedVersionRange
import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
@@ -121,8 +121,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
expectedFinalizedFeaturesEpoch: Long,
expectedSupportedFeatures: Features[SupportedVersionRange]): Unit = {
assertEquals(expectedNode, getFeatureZNode())
- val featureMetadata = client.describeFeatures(
- new DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata.get
+ val featureMetadata = client.describeFeatures.featureMetadata.get
assertEquals(expectedFinalizedFeatures, finalizedFeatures(featureMetadata.finalizedFeatures))
assertEquals(expectedSupportedFeatures, supportedFeatures(featureMetadata.supportedFeatures))
assertEquals(Optional.of(expectedFinalizedFeaturesEpoch), featureMetadata.finalizedFeaturesEpoch)