You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/05/28 16:15:44 UTC
[kafka] branch trunk updated: MINOR: Improve code style in FenceProducersHandler (#12208)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6b93652a54 MINOR: Improve code style in FenceProducersHandler (#12208)
6b93652a54 is described below
commit 6b93652a5469b66419189336a6a96d5742a19043
Author: David Jacot <dj...@confluent.io>
AuthorDate: Sat May 28 18:15:22 2022 +0200
MINOR: Improve code style in FenceProducersHandler (#12208)
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../admin/internals/FenceProducersHandler.java | 41 ++++++++++++----------
1 file changed, 22 insertions(+), 19 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
index 225c6f4e75..23572dd441 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/FenceProducersHandler.java
@@ -47,15 +47,15 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
}
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, ProducerIdAndEpoch> newFuture(
- Collection<String> transactionalIds
+ Collection<String> transactionalIds
) {
return AdminApiFuture.forKeys(buildKeySet(transactionalIds));
}
private static Set<CoordinatorKey> buildKeySet(Collection<String> transactionalIds) {
return transactionalIds.stream()
- .map(CoordinatorKey::byTransactionalId)
- .collect(Collectors.toSet());
+ .map(CoordinatorKey::byTransactionalId)
+ .collect(Collectors.toSet());
}
@Override
@@ -75,24 +75,24 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
" when building `InitProducerId` request");
}
InitProducerIdRequestData data = new InitProducerIdRequestData()
- // Because we never include a producer epoch or ID in this request, we expect that some errors
- // (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
- // If we ever modify this logic to include an epoch or producer ID, we will need to update the
- // error handling logic for this handler to accommodate these new errors.
- .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
- .setProducerId(ProducerIdAndEpoch.NONE.producerId)
- .setTransactionalId(key.idValue)
- // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
- // and shouldn't be used for any actual record writes
- .setTransactionTimeoutMs(1);
+ // Because we never include a producer epoch or ID in this request, we expect that some errors
+ // (such as PRODUCER_FENCED) will never be returned in the corresponding broker response.
+ // If we ever modify this logic to include an epoch or producer ID, we will need to update the
+ // error handling logic for this handler to accommodate these new errors.
+ .setProducerEpoch(ProducerIdAndEpoch.NONE.epoch)
+ .setProducerId(ProducerIdAndEpoch.NONE.producerId)
+ .setTransactionalId(key.idValue)
+ // Set transaction timeout to 1 since it's only being initialized to fence out older producers with the same transactional ID,
+ // and shouldn't be used for any actual record writes
+ .setTransactionTimeoutMs(1);
return new InitProducerIdRequest.Builder(data);
}
@Override
public ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleSingleResponse(
- Node broker,
- CoordinatorKey key,
- AbstractResponse abstractResponse
+ Node broker,
+ CoordinatorKey key,
+ AbstractResponse abstractResponse
) {
InitProducerIdResponse response = (InitProducerIdResponse) abstractResponse;
@@ -102,14 +102,17 @@ public class FenceProducersHandler extends AdminApiHandler.Unbatched<Coordinator
}
Map<CoordinatorKey, ProducerIdAndEpoch> completed = Collections.singletonMap(key, new ProducerIdAndEpoch(
- response.data().producerId(),
- response.data().producerEpoch()
+ response.data().producerId(),
+ response.data().producerEpoch()
));
return new ApiResult<>(completed, Collections.emptyMap(), Collections.emptyList());
}
- private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(CoordinatorKey transactionalIdKey, Errors error) {
+ private ApiResult<CoordinatorKey, ProducerIdAndEpoch> handleError(
+ CoordinatorKey transactionalIdKey,
+ Errors error
+ ) {
switch (error) {
case CLUSTER_AUTHORIZATION_FAILED:
return ApiResult.failed(transactionalIdKey, new ClusterAuthorizationException(