You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2020/11/03 01:19:18 UTC

[kafka] branch trunk updated: MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b9e2a89  MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)
b9e2a89 is described below

commit b9e2a89c0f48f8fa0849b173a2846d3689201dd6
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Mon Nov 2 17:16:52 2020 -0800

    MINOR: KIP-584: Remove admin client facility to read features from controller (#9536)
    
    In this PR, I have eliminated the facility in Admin#describeFeatures API and it's implementation to be able to optionally send a describeFeatures request to the controller. This feature was not seen to be particularly useful, and besides it also poses some hindrance to post KIP-500 world where no client would be able to access the controller directly.
    
    Reviewers: Chia-Ping Tsai <ch...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 .../java/org/apache/kafka/clients/admin/Admin.java |  8 ++----
 .../clients/admin/DescribeFeaturesOptions.java     | 20 --------------
 .../kafka/clients/admin/KafkaAdminClient.java      |  9 ++----
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 22 +++++----------
 .../main/scala/kafka/admin/FeatureCommand.scala    | 32 +++++-----------------
 .../unit/kafka/admin/FeatureCommandTest.scala      |  2 +-
 .../unit/kafka/server/UpdateFeaturesTest.scala     |  5 ++--
 7 files changed, 22 insertions(+), 76 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
index f3bd8b6..90e9b0b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/Admin.java
@@ -1381,10 +1381,8 @@ public interface Admin extends AutoCloseable {
     }
 
     /**
-     * Describes finalized as well as supported features. By default, the request is issued to any
-     * broker. It can be optionally directed only to the controller via DescribeFeaturesOptions
-     * parameter. This is particularly useful if the user requires strongly consistent reads of
-     * finalized features.
+     * Describes finalized as well as supported features. The request is issued to any random
+     * broker.
      * <p>
      * The following exceptions can be anticipated when calling {@code get()} on the future from the
      * returned {@link DescribeFeaturesResult}:
@@ -1435,7 +1433,7 @@ public interface Admin extends AutoCloseable {
      *   This means there was an unexpected error encountered when the update was applied on
      *   the controller. There is no guarantee on whether the update succeeded or failed. The best
      *   way to find out is to issue a {@link Admin#describeFeatures(DescribeFeaturesOptions)}
-     *   request to the controller to get the latest features.</li>
+     *   request.</li>
      * </ul>
      * <p>
      * This operation is supported by brokers with version 2.7.0 or higher.
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
index 4a37956..a51ca74 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeFeaturesOptions.java
@@ -25,24 +25,4 @@ import org.apache.kafka.common.annotation.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class DescribeFeaturesOptions extends AbstractOptions<DescribeFeaturesOptions> {
-
-    /**
-     * - True means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request must be
-     *   issued only to the controller.
-     * - False means the {@link Admin#describeFeatures(DescribeFeaturesOptions)} request can be
-     *   issued to any random broker.
-     */
-    private boolean sendRequestToController = false;
-
-    /**
-     * Sets a flag indicating that the describe features request must be issued only to the controller.
-     */
-    public DescribeFeaturesOptions sendRequestToController(boolean sendRequestToController) {
-        this.sendRequestToController = sendRequestToController;
-        return this;
-    }
-
-    public boolean sendRequestToController() {
-        return sendRequestToController;
-    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 96cf086..469e4f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4350,14 +4350,12 @@ public class KafkaAdminClient extends AdminClient {
                 .hi(password, salt, iterations);
     }
 
+    @Override
     public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
         final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
         final long now = time.milliseconds();
-        final NodeProvider provider =
-            options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider();
-
         final Call call = new Call(
-            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) {
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), new LeastLoadedNodeProvider()) {
 
             private FeatureMetadata createFeatureMetadata(final ApiVersionsResponse response) {
                 final Map<String, FinalizedVersionRange> finalizedFeatures = new HashMap<>();
@@ -4390,9 +4388,6 @@ public class KafkaAdminClient extends AdminClient {
                 final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
                 if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
                     future.complete(createFeatureMetadata(apiVersionsResponse));
-                } else if (options.sendRequestToController() &&
-                           apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
-                    handleNotControllerError(Errors.NOT_CONTROLLER);
                 } else {
                     future.completeExceptionally(Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
                 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index a089aa8..1d2dd4a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -4096,24 +4096,16 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDescribeFeaturesHandleNotControllerException() throws Exception {
+    public void testDescribeFeaturesFailure() {
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
-            env.kafkaClient().prepareResponseFrom(
-                prepareApiVersionsResponseForDescribeFeatures(Errors.NOT_CONTROLLER),
-                env.cluster().nodeById(0));
-            env.kafkaClient().prepareResponse(MetadataResponse.prepareResponse(env.cluster().nodes(),
-                env.cluster().clusterResource().clusterId(),
-                1,
-                Collections.emptyList()));
-            env.kafkaClient().prepareResponseFrom(
-                prepareApiVersionsResponseForDescribeFeatures(Errors.NONE),
-                env.cluster().nodeById(1));
+            env.kafkaClient().prepareResponse(
+                body -> body instanceof ApiVersionsRequest,
+                prepareApiVersionsResponseForDescribeFeatures(Errors.INVALID_REQUEST));
             final DescribeFeaturesOptions options = new DescribeFeaturesOptions();
-            options.sendRequestToController(true);
             options.timeoutMs(10000);
-            final KafkaFuture<FeatureMetadata> future
-                = env.adminClient().describeFeatures(options).featureMetadata();
-            future.get();
+            final KafkaFuture<FeatureMetadata> future = env.adminClient().describeFeatures(options).featureMetadata();
+            final ExecutionException e = assertThrows(ExecutionException.class, () -> future.get());
+            assertEquals(e.getCause().getClass(), Errors.INVALID_REQUEST.exception().getClass());
         }
     }
 
diff --git a/core/src/main/scala/kafka/admin/FeatureCommand.scala b/core/src/main/scala/kafka/admin/FeatureCommand.scala
index 9cc0a10..aa2b93e 100644
--- a/core/src/main/scala/kafka/admin/FeatureCommand.scala
+++ b/core/src/main/scala/kafka/admin/FeatureCommand.scala
@@ -20,7 +20,7 @@ package kafka.admin
 import kafka.server.BrokerFeatures
 import kafka.utils.{CommandDefaultOptions, CommandLineUtils, Exit}
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureMetadata, FeatureUpdate, UpdateFeaturesOptions}
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions}
 import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
 import org.apache.kafka.common.utils.Utils
 import java.util.Properties
@@ -98,18 +98,12 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
     opts = newOpts
   }
 
-  private def describeFeatures(sendRequestToController: Boolean): FeatureMetadata = {
-    val options = new DescribeFeaturesOptions().sendRequestToController(sendRequestToController)
-    adminClient.describeFeatures(options).featureMetadata().get()
-  }
-
   /**
-   * Describes the supported and finalized features. If the --from-controller CLI option
-   * is provided, then the request is issued only to the controller, otherwise the request is issued
-   * to any of the provided bootstrap servers.
+   * Describes the supported and finalized features. The request is issued to any of the provided
+   * bootstrap servers.
    */
   def describeFeatures(): Unit = {
-    val result = describeFeatures(opts.hasFromControllerOption)
+    val result = adminClient.describeFeatures.featureMetadata.get
     val features = result.supportedFeatures.asScala.keys.toSet ++ result.finalizedFeatures.asScala.keys.toSet
 
     features.toList.sorted.foreach {
@@ -163,7 +157,7 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
    * @throws UpdateFeaturesException if at least one of the feature updates failed
    */
   def upgradeAllFeatures(): Unit = {
-    val metadata = describeFeatures(true)
+    val metadata = adminClient.describeFeatures.featureMetadata.get
     val existingFinalizedFeatures = metadata.finalizedFeatures
     val updates = supportedFeatures.features.asScala.map {
       case (feature, targetVersionRange) =>
@@ -210,7 +204,7 @@ class FeatureApis(private var opts: FeatureCommandOptions) {
    * @throws UpdateFeaturesException if at least one of the feature updates failed
    */
   def downgradeAllFeatures(): Unit = {
-    val metadata = describeFeatures(true)
+    val metadata = adminClient.describeFeatures.featureMetadata.get
     val existingFinalizedFeatures = metadata.finalizedFeatures
     val supportedFeaturesMap = supportedFeatures.features
     val updates = existingFinalizedFeatures.asScala.map {
@@ -331,12 +325,7 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
     .ofType(classOf[String])
   private val describeOpt = parser.accepts(
     "describe",
-    "Describe supported and finalized features. By default, the features are described from a" +
-    " random broker. The request can be optionally directed only to the controller using the" +
-    " --from-controller option.")
-  private val fromControllerOpt = parser.accepts(
-    "from-controller",
-    "Describe supported and finalized features from the controller.")
+    "Describe supported and finalized features from a random broker.")
   private val upgradeAllOpt = parser.accepts(
     "upgrade-all",
     "Upgrades all finalized features to the maximum version levels known to the tool." +
@@ -359,8 +348,6 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
 
   def hasDescribeOption: Boolean = has(describeOpt)
 
-  def hasFromControllerOption: Boolean = has(fromControllerOpt)
-
   def hasDryRunOption: Boolean = has(dryRunOpt)
 
   def hasUpgradeAllOption: Boolean = has(upgradeAllOpt)
@@ -390,11 +377,6 @@ class FeatureCommandOptions(args: Array[String]) extends CommandDefaultOptions(a
         parser,
         "Command can contain --dry-run option only when either --upgrade-all or --downgrade-all actions are provided.")
     }
-    if (hasFromControllerOption && !hasDescribeOption) {
-      CommandLineUtils.printUsageAndDie(
-        parser,
-        "Command can contain --from-controller option only when --describe action is provided.")
-    }
   }
 }
 
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index 0b9f80d..f548af6 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -76,7 +76,7 @@ class FeatureCommandTest extends BaseRequestTest {
   @Test
   def testDescribeFeaturesSuccess(): Unit = {
     updateSupportedFeaturesInAllBrokers(defaultSupportedFeatures)
-    val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe", "--from-controller")))
+    val featureApis = new FeatureApis(new FeatureCommandOptions(Array("--bootstrap-server", brokerList, "--describe")))
     featureApis.setSupportedFeatures(defaultSupportedFeatures)
     try {
       val initialDescribeOutput = TestUtils.grabConsoleOutput(featureApis.describeFeatures())
diff --git a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
index a16ff30..f009909 100644
--- a/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/UpdateFeaturesTest.scala
@@ -24,7 +24,7 @@ import kafka.api.KAFKA_2_7_IV0
 import kafka.utils.TestUtils
 import kafka.zk.{FeatureZNode, FeatureZNodeStatus, ZkVersion}
 import kafka.utils.TestUtils.waitUntilTrue
-import org.apache.kafka.clients.admin.{Admin, DescribeFeaturesOptions, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
+import org.apache.kafka.clients.admin.{Admin, FeatureUpdate, UpdateFeaturesOptions, UpdateFeaturesResult}
 import org.apache.kafka.common.errors.InvalidRequestException
 import org.apache.kafka.common.feature.FinalizedVersionRange
 import org.apache.kafka.common.feature.{Features, SupportedVersionRange}
@@ -121,8 +121,7 @@ class UpdateFeaturesTest extends BaseRequestTest {
                             expectedFinalizedFeaturesEpoch: Long,
                             expectedSupportedFeatures: Features[SupportedVersionRange]): Unit = {
     assertEquals(expectedNode, getFeatureZNode())
-    val featureMetadata = client.describeFeatures(
-      new DescribeFeaturesOptions().sendRequestToController(true)).featureMetadata.get
+    val featureMetadata = client.describeFeatures.featureMetadata.get
     assertEquals(expectedFinalizedFeatures, finalizedFeatures(featureMetadata.finalizedFeatures))
     assertEquals(expectedSupportedFeatures, supportedFeatures(featureMetadata.supportedFeatures))
     assertEquals(Optional.of(expectedFinalizedFeaturesEpoch), featureMetadata.finalizedFeaturesEpoch)