You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/28 16:15:44 UTC

[kafka] branch trunk updated: MINOR: Improve code style in FenceProducersHandler (#12208)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6b93652a54 MINOR: Improve code style in FenceProducersHandler (#12208)
6b93652a54 is described below

commit 6b93652a5469b66419189336a6a96d5742a19043
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat May 28 18:15:22 2022 +0200

    MINOR: Improve code style in FenceProducersHandler (#12208)
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../admin/internals/FenceProducersHandler.java     | 41 ++++++++++++----------
 1 file changed, 22 insertions(+), 19 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
index 225c6f4e75..23572dd441 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -47,15 +47,15 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
     }
 
     public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
-            Collection<String> transactionalIds
+        Collection<String> transactionalIds
     ) {
         return AdminApiFuture.forKeys(buildKeySet(transactionalIds));
     }
 
     private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
         return transactionalIds.stream()
-                .map(CoordinatorKey::byTransactionalId)
-                .collect(Collectors.toSet());
+            .map(CoordinatorKey::byTransactionalId)
+            .collect(Collectors.toSet());
     }
 
     @Override
@@ -75,24 +75,24 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
                     " when building `InitProducerId` request");
         }
         InitProducerIdRequestData data = new InitProducerIdRequestData()
-                // Because we never include a producer epoch or ID in this request, we expect that some errors
-                // (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
-                // If we ever modify this logic to include an epoch or producer ID, we will need to update the
-                // error handling logic for this handler to accommodate these new errors.
-                .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
-                .setProducerId(ProducerIdAndEpoch.NONE.producerId)
-                .setTransactionalId(key.idValue)
-                // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
-                // and shouldn't be used for any actual record writes
-                .setTransactionTimeoutMs(1);
+            // Because we never include a producer epoch or ID in this request, we expect that some errors
+            // (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
+            // If we ever modify this logic to include an epoch or producer ID, we will need to update the
+            // error handling logic for this handler to accommodate these new errors.
+            .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
+            .setProducerId(ProducerIdAndEpoch.NONE.producerId)
+            .setTransactionalId(key.idValue)
+            // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
+            // and shouldn't be used for any actual record writes
+            .setTransactionTimeoutMs(1);
         return new InitProducerIdRequest.Builder(data);
     }
 
     @Override
     public ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleSingleResponse(
-            Node broker,
-            CoordinatorKey key,
-            AbstractResponse abstractResponse
+        Node broker,
+        CoordinatorKey key,
+        AbstractResponse abstractResponse
     ) {
         InitProducerIdResponse response = (InitProducerIdResponse) abstractResponse;
 
@@ -102,14 +102,17 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
         }
 
         Map<CoordinatorKey, ProducerIdAndEpoch> completed = Collections.singletonMap(key, new ProducerIdAndEpoch(
-                response.data().producerId(),
-                response.data().producerEpoch()
+            response.data().producerId(),
+            response.data().producerEpoch()
         ));
 
         return new ApiResult<>(completed, Collections.emptyMap(), Collections.emptyList());
     }
 
-    private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(CoordinatorKey transactionalIdKey, Errors error) {
+    private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(
+        CoordinatorKey transactionalIdKey,
+        Errors error
+    ) {
         switch (error) {
             case CLUSTER_AUTHORIZATION_FAILED:
                 return ApiResult.failed(transactionalIdKey, new ClusterAuthorizationException(