You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/07/11 12:38:59 UTC

[GitHub] [kafka] showuon commented on a change in pull request #11016: KAFKA-13058; AlterConsumerGroupOffsetsHandler does not handle partition errors correctly.

showuon commented on a change in pull request #11016:
URL: https://github.com/apache/kafka/pull/11016#discussion_r667475174



##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
##########
@@ -75,22 +69,19 @@ public void accept(final Map<TopicPartition, Errors> topicPartitions, final Thro
      * Return a future which succeeds if all the alter offsets succeed.
      */
     public KafkaFuture<Void> all() {
-        return this.future.thenApply(new BaseFunction<Map<TopicPartition, Errors>, Void>() {
-            @Override
-            public Void apply(final Map<TopicPartition, Errors> topicPartitionErrorsMap) {
-                List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()
-                        .stream()
-                        .filter(e -> e.getValue() != Errors.NONE)
-                        .map(Map.Entry::getKey)
-                        .collect(Collectors.toList());
-                for (Errors error : topicPartitionErrorsMap.values()) {
-                    if (error != Errors.NONE) {
-                        throw error.exception(
-                            "Failed altering consumer group offsets for the following partitions: " + partitionsFailed);
-                    }
+        return this.future.thenApply(topicPartitionErrorsMap ->  {
+            List<TopicPartition> partitionsFailed = topicPartitionErrorsMap.entrySet()

Review comment:
       change to lambda expression?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/AlterConsumerGroupOffsetsResult.java
##########
@@ -48,23 +46,19 @@
     public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
         final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
 
-        this.future.whenComplete(new BiConsumer<Map<TopicPartition, Errors>, Throwable>() {
-            @Override
-            public void accept(final Map<TopicPartition, Errors> topicPartitions, final Throwable throwable) {
-                if (throwable != null) {
-                    result.completeExceptionally(throwable);
-                } else if (!topicPartitions.containsKey(partition)) {
-                    result.completeExceptionally(new IllegalArgumentException(
-                        "Alter offset for partition \"" + partition + "\" was not attempted"));
+        this.future.whenComplete((topicPartitions, throwable) -> {

Review comment:
       This is just a change to lambda expression, no content change, right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org