You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/01 02:54:16 UTC

[GitHub] [kafka] kowshik commented on a change in pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

kowshik commented on a change in pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#discussion_r463912157



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -4052,6 +4058,128 @@ void handleFailure(Throwable throwable) {
         return new AlterClientQuotasResult(Collections.unmodifiableMap(futures));
     }
 
+    @Override
+    public DescribeFeaturesResult describeFeatures(final DescribeFeaturesOptions options) {
+        final KafkaFutureImpl<FeatureMetadata> future = new KafkaFutureImpl<>();
+        final long now = time.milliseconds();
+        final NodeProvider provider =
+            options.sendRequestToController() ? new ControllerNodeProvider() : new LeastLoadedNodeProvider();
+
+        Call call = new Call(
+            "describeFeatures", calcDeadlineMs(now, options.timeoutMs()), provider) {
+
+            @Override
+            ApiVersionsRequest.Builder createRequest(int timeoutMs) {
+                return new ApiVersionsRequest.Builder();
+            }
+
+            @Override
+            void handleResponse(AbstractResponse response) {
+                final ApiVersionsResponse apiVersionsResponse = (ApiVersionsResponse) response;
+                if (apiVersionsResponse.data.errorCode() == Errors.NONE.code()) {
+                    future.complete(
+                        new FeatureMetadata(
+                            apiVersionsResponse.finalizedFeatures(),
+                            apiVersionsResponse.finalizedFeaturesEpoch(),
+                            apiVersionsResponse.supportedFeatures()));
+                } else if (options.sendRequestToController() && apiVersionsResponse.data.errorCode() == Errors.NOT_CONTROLLER.code()) {
+                    handleNotControllerError(Errors.NOT_CONTROLLER);
+                } else {
+                    future.completeExceptionally(
+                        Errors.forCode(apiVersionsResponse.data.errorCode()).exception());
+                }
+            }
+
+            @Override
+            void handleFailure(Throwable throwable) {
+                completeAllExceptionally(Collections.singletonList(future), throwable);
+            }
+        };
+
+        runnable.call(call, now);
+        return new DescribeFeaturesResult(future);
+    }
+
+    @Override
+    public UpdateFeaturesResult updateFeatures(
+        final Map<String, FeatureUpdate> featureUpdates, final UpdateFeaturesOptions options) {
+        if (featureUpdates == null || featureUpdates.isEmpty()) {
+            throw new IllegalArgumentException("Feature updates can not be null or empty.");
+        }
+        Objects.requireNonNull(options, "UpdateFeaturesOptions can not be null");
+
+        final Map<String, KafkaFutureImpl<Void>> updateFutures = new HashMap<>();
+        final UpdateFeaturesRequestData.FeatureUpdateKeyCollection featureUpdatesRequestData
+            = new UpdateFeaturesRequestData.FeatureUpdateKeyCollection();
+        for (Map.Entry<String, FeatureUpdate> entry : featureUpdates.entrySet()) {
+            final String feature = entry.getKey();
+            final FeatureUpdate update = entry.getValue();
+            if (feature.trim().isEmpty()) {
+                throw new IllegalArgumentException("Provided feature can not be null or empty.");
+            }
+
+            updateFutures.put(feature, new KafkaFutureImpl<>());
+            final UpdateFeaturesRequestData.FeatureUpdateKey requestItem =
+                new UpdateFeaturesRequestData.FeatureUpdateKey();
+            requestItem.setFeature(feature);
+            requestItem.setMaxVersionLevel(update.maxVersionLevel());
+            requestItem.setAllowDowngrade(update.allowDowngrade());
+            featureUpdatesRequestData.add(requestItem);
+        }
+        final UpdateFeaturesRequestData request = new UpdateFeaturesRequestData().setFeatureUpdates(featureUpdatesRequestData);
+
+        final long now = time.milliseconds();
+        final Call call = new Call("updateFeatures", calcDeadlineMs(now, options.timeoutMs()),
+            new ControllerNodeProvider()) {
+
+            @Override
+            UpdateFeaturesRequest.Builder createRequest(int timeoutMs) {
+                return new UpdateFeaturesRequest.Builder(request);
+            }
+
+            @Override
+            void handleResponse(AbstractResponse abstractResponse) {
+                final UpdateFeaturesResponse response =
+                    (UpdateFeaturesResponse) abstractResponse;
+
+                // Check for controller change.
+                for (UpdatableFeatureResult result : response.data().results()) {
+                    final Errors error = Errors.forCode(result.errorCode());
+                    if (error == Errors.NOT_CONTROLLER) {
+                        handleNotControllerError(error);
+                        throw error.exception();
+                    }
+                }
+
+                for (UpdatableFeatureResult result : response.data().results()) {
+                    final KafkaFutureImpl<Void> future = updateFutures.get(result.feature());
+                    if (future == null) {

Review comment:
       It does not overlap. This checks for unexpected responses for features that we never intended to update. `completeUnrealizedFutures` checks for futures that we never got a response for from the server.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org