You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ab...@apache.org on 2022/03/04 18:00:36 UTC

[kafka] branch trunk updated: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED (#11813)

This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f54fae  KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED (#11813)
6f54fae is described below

commit 6f54faed2d0792f3a36534fd7e6d00b6603253a8
Author: A. Sophie Blee-Goldman <so...@confluent.io>
AuthorDate: Fri Mar 4 09:58:56 2022 -0800

    KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED (#11813)
    
    Currently the #add/removeNamedTopology APIs behave a little wonky when the application is still in CREATED. Since adding and removing topologies runs some validation steps there is valid reason to want to add or remove a topology on a dummy app that you don't plan to start, or a real app that you haven't started yet. But to actually check the results of the validation you need to call get() on the future, so we need to make sure that get() won't block forever in the case of no failure [...]
    
    Reviewers: Guozhang Wang <gu...@confluent.io>, Walker Carlson <wc...@confluent.io>
---
 .../KafkaStreamsNamedTopologyWrapper.java          | 111 +++++++++++++--------
 .../processor/internals/NamedTopologyTest.java     |  28 +++++-
 2 files changed, 97 insertions(+), 42 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index fb005d1..6355cae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -99,8 +99,10 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
 
     /**
      * Start up Streams with a collection of initial NamedTopologies (may be empty)
+     *
+     * Note: this is synchronized to ensure that the application state cannot change while we add topologies
      */
-    public void start(final Collection<NamedTopology> initialTopologies) {
+    public synchronized void start(final Collection<NamedTopology> initialTopologies) {
         log.info("Starting Streams with topologies: {}", initialTopologies);
         for (final NamedTopology topology : initialTopologies) {
             final AddNamedTopologyResult addNamedTopologyResult = addNamedTopology(topology);
@@ -145,7 +147,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
     /**
      * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name
      */
-    public Optional<NamedTopology> getTopologyByName(final String name) {
+    public synchronized Optional<NamedTopology> getTopologyByName(final String name) {
         return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology);
     }
 
@@ -180,7 +182,9 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
             );
         } else {
             topologyMetadata.registerAndBuildNewTopology(future, newTopology.internalTopologyBuilder());
+            maybeCompleteFutureIfStillInCREATED(future, "adding topology " + newTopology.name());
         }
+
         return new AddNamedTopologyResult(future);
     }
 
@@ -205,7 +209,8 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
 
         if (hasStartedOrFinishedShuttingDown()) {
             log.error("Attempted to remove topology {} from while the Kafka Streams was in state {}, "
-                          + "application must be started first.", topologyToRemove, state
+                          + "topologies cannot be modified if the application has begun or completed shutting down.",
+                      topologyToRemove, state
             );
             removeTopologyFuture.completeExceptionally(
                 new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state)
@@ -218,6 +223,7 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
                 new UnknownTopologyException("Unable to remove topology", topologyToRemove)
             );
         }
+
         final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
             .stream()
             .flatMap(t -> {
@@ -230,53 +236,76 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams {
 
         topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove);
 
-        if (resetOffsets) {
+        final boolean skipResetForUnstartedApplication =
+            maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove);
+
+        if (resetOffsets && !skipResetForUnstartedApplication) {
             log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}",
                      removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully",
                      topologyToRemove, partitionsToReset
             );
-            if (!partitionsToReset.isEmpty()) {
-                removeTopologyFuture.whenComplete((v, throwable) -> {
-                    if (throwable != null) {
-                        removeTopologyFuture.completeExceptionally(throwable);
-                    }
-                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-                    while (deleteOffsetsResult == null) {
-                        try {
-                            deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
-                                applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
-                            deleteOffsetsResult.all().get();
-                        } catch (final InterruptedException ex) {
+            return resetOffsets(removeTopologyFuture, partitionsToReset);
+        } else {
+            return new RemoveNamedTopologyResult(removeTopologyFuture);
+        }
+    }
+
+    /**
+     * @return  true iff the application is still in CREATED and the future was completed
+     */
+    private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl<Void> updateTopologyFuture,
+                                                        final String operation) {
+        if (state == State.CREATED && !updateTopologyFuture.isDone()) {
+            updateTopologyFuture.complete(null);
+            log.info("Completed {} since application has not been started", operation);
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture,
+                                                   final Set<TopicPartition> partitionsToReset) {
+        if (!partitionsToReset.isEmpty()) {
+            removeTopologyFuture.whenComplete((v, throwable) -> {
+                if (throwable != null) {
+                    removeTopologyFuture.completeExceptionally(throwable);
+                }
+                DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
+                while (deleteOffsetsResult == null) {
+                    try {
+                        deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
+                            applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
+                        deleteOffsetsResult.all().get();
+                    } catch (final InterruptedException ex) {
+                        ex.printStackTrace();
+                        break;
+                    } catch (final ExecutionException ex) {
+                        if (ex.getCause() != null &&
+                            ex.getCause() instanceof GroupSubscribedToTopicException &&
+                            ex.getCause()
+                                .getMessage()
+                                .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) {
                             ex.printStackTrace();
+                        } else if (ex.getCause() != null &&
+                            ex.getCause() instanceof GroupIdNotFoundException) {
+                            log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further.");
                             break;
-                        } catch (final ExecutionException ex) {
-                            if (ex.getCause() != null &&
-                                ex.getCause() instanceof GroupSubscribedToTopicException &&
-                                ex.getCause()
-                                    .getMessage()
-                                    .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) {
-                                ex.printStackTrace();
-                            } else if (ex.getCause() != null &&
-                                ex.getCause() instanceof GroupIdNotFoundException) {
-                                log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further.");
-                                break;
-                            } else {
-                                removeTopologyFuture.completeExceptionally(ex);
-                            }
-                            deleteOffsetsResult = null;
-                        }
-                        try {
-                            Thread.sleep(100);
-                        } catch (final InterruptedException ex) {
-                            ex.printStackTrace();
+                        } else {
+                            removeTopologyFuture.completeExceptionally(ex);
                         }
+                        deleteOffsetsResult = null;
                     }
-                    removeTopologyFuture.complete(null);
-                });
-                return new RemoveNamedTopologyResult(removeTopologyFuture,  removeTopologyFuture);
-            }
+                    try {
+                        Thread.sleep(100);
+                    } catch (final InterruptedException ex) {
+                        ex.printStackTrace();
+                    }
+                }
+                removeTopologyFuture.complete(null);
+            });
         }
-        return new RemoveNamedTopologyResult(removeTopologyFuture);
+        return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture);
     }
 
     /**
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
index 29e7e6a..65f5a9d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
@@ -125,6 +125,17 @@ public class NamedTopologyTest {
     }
 
     @Test
+    public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception {
+        builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store")));
+        builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store")));
+
+        streams.addNamedTopology(builder1.build()).all().get();
+        streams.addNamedTopology(builder2.build()).all().get();
+
+        streams.removeNamedTopology("topology-2").all().get();
+    }
+
+    @Test
     public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() {
         builder1.stream("stream");
         builder2.stream("stream");
@@ -138,7 +149,7 @@ public class NamedTopologyTest {
     }
 
     @Test
-    public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopic() {
+    public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() {
         builder1.stream("stream");
         builder2.stream("stream");
 
@@ -155,6 +166,21 @@ public class NamedTopologyTest {
     }
 
     @Test
+    public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicBeforeStart() {
+        builder1.stream("stream");
+        builder2.stream("stream");
+        
+        streams.addNamedTopology(builder1.build());
+
+        final ExecutionException exception = assertThrows(
+            ExecutionException.class,
+            () -> streams.addNamedTopology(builder2.build()).all().get()
+        );
+
+        assertThat(exception.getCause().getClass(), equalTo(TopologyException.class));
+    }
+
+    @Test
     public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() {
         builder1.table("table");
         builder2.table("table");