You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2021/12/17 03:42:24 UTC

[kafka] branch trunk updated: MINOR: retry when deleting offsets for named topologies (#11604)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 247c271  MINOR: retry when deleting offsets for named topologies (#11604)
247c271 is described below

commit 247c271353752a588162983a1a6f7eb96cf9870f
Author: Walker Carlson <18...@users.noreply.github.com>
AuthorDate: Thu Dec 16 21:39:55 2021 -0600

    MINOR: retry when deleting offsets for named topologies (#11604)
    
    When this was made I didn't expect deleteOffsetsResult to be set if an exception was thrown. But it is and to retry we need to reset it to null. Changing the KafkaStreamsNamedTopologyWrapper for remove topology when resetting offsets to retry upon GroupSubscribedToTopicException and swallow/complete upon GroupIdNotFoundException
    
    Reviewers: Anna Sophie Blee-Goldman <ab...@ache.>
---
 .../internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java   | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index 0dd7eca..b332f94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
 import org.apache.kafka.common.KafkaFuture;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
+import org.apache.kafka.common.errors.GroupIdNotFoundException;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.errors.GroupSubscribedToTopicException;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -221,9 +222,14 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
                                     .getMessage()
                                     .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) {
                                 ex.printStackTrace();
+                            } else if (ex.getCause() != null &&
+                                ex.getCause() instanceof GroupIdNotFoundException) {
+                                log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further.");
+                                break;
                             } else {
                                 future.completeExceptionally(ex);
                             }
+                            deleteOffsetsResult = null;
                         }
                         try {
                             Thread.sleep(100);