You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/09 23:33:31 UTC

[GitHub] [kafka] wcarlson5 opened a new pull request #11479: KAFKA-12648: Make changing the named topologies blocking

wcarlson5 opened a new pull request #11479:
URL: https://github.com/apache/kafka/pull/11479


   Use a Kafka future to be able to add, remove then add back the same topology
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750900571



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       Thanks! Just wanted to check that we would only want to remove partitions related to the `topologyToRemove` here :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r754573181



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       synced off line




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752580337



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) {
         if (isEmpty() && threadState.get().isAlive()) {
             try {
                 lock();
-                while (isEmpty() && threadState.get().isAlive()) {

Review comment:
       It seems that we can either leave it in a loop or expose the lock and unlock to the stream thread. We can't just call it raw as spotbugs complains




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#issuecomment-971213962


   Hey @wcarlson5 sorry for getting late on this review.. overall it looks good, and I think adding the future makes sense. Just feeling that we can simplify the thread coordination on a single-rebalance-trigger a bit.
   
   Also, seems some related tests are failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r755680080



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
 
-        topologyMetadata.unregisterTopology(topologyToRemove);
+        if (resetOffsets) {
+            final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            log.info("partitions to reset: {}", partitionsToReset);

Review comment:
       ```suggestion
               log.info("Resetting offsets for the following partitions of NamedTopology {}: {}", topologyToRemove, partitionsToReset);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {

Review comment:
       nit, this method name has been confusing me since it seems to imply that it would return a boolean for whether or not we reached the latest version, can you give a more descriptive name?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
 
-        topologyMetadata.unregisterTopology(topologyToRemove);
+        if (resetOffsets) {
+            final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                removeTopologyFuture.whenComplete((v, throwable) -> {
+                    if (throwable != null) {
+                        future.completeExceptionally(throwable);
+                    }
+                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
+                    while (deleteOffsetsResult == null) {

Review comment:
       Is the point of this loop to account for multi-node clusters where the deleteOffsetsRequest might throw an exception? If so, can we catch that specific exception and retry rather than retrying for all ExecutionExceptions which could be any number of potentially fatal things, I think

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();

Review comment:
       I see @guozhangwang 's point, but I think it's not worth obsessing over. @wcarlson5 how about you just file a V1.1 ticket for cleanup like this? Doesn't seem worth blocking the PR over :) 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure

Review comment:
       And/or  just temporarily change the return type here back to `void`  since it's pretty much a blocking call for now

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure

Review comment:
       Can you file an AK ticket and leave a TODO here mentioning that this method is not actually async at the moment? Since we have some interest from Kafka Streams users who want to try out this feature before it's officially published, we should at least make sure the javadocs reflect the current behavior of these APIs

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       How about we just call `subscribeConsumer()` here without any `if`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750797917



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       As for completing the `future` objects: I think it's okay to let the first future be waiting a bit more if there are consecutive operations, i.e. in the above implementation we would complete all futures that have been registered so far when we eventually get every still-alive threads to catch up with the bumped version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#issuecomment-964644741


   @ableegoldman @guozhangwang @rodesai  Can you give this a look? I would like to say its pretty strait forward but....


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r754759558



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);

Review comment:
       before add and remove are called the version is 0 once we add a topology it becomes 1

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) {
         }
     }
 
-    public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) {
+    /**
+     * Adds the topology and registers a future that listens for all threads on the older version to see the update
+     */
+    public KafkaFuture<Void> registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         try {
             lock();
             version.topologyVersion.incrementAndGet();
             log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future));

Review comment:
       responded to your other comment :)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();

Review comment:
       I reason I would prefer to keep it as a list of `topologyVersionWaiters` is because while threads are reaching a version the topology can be updated again. Maybe I have been looking at this too long but it seems easier to keep track of this way to me. We have a complexity of the number of times the topology have updated * the number of threads. Realistically the number of threads are bounded so we can treat as a constant. And we would have the same number of futures (times the topology has been updated) to complete either way. I think the little extra verbosity makes it easier to really understand what is going on. But then maybe I am too close to this code :|

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       if we are removing the last topology we need to unsubscribe for a few reasons. One of which is we can't reset offsets while still subscribed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752834113



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       Yeah I hadn't thought of multi node clusters, that will be a problem. calling get here isn't going to be enough even if we wanted to do that.
   
   We can maybe do it so that it will call it for each cluster after it has unsubscribed and expect it to fail for all but the last one? we can talk tomorrow 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752821836



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       unfortunately if we want to reset the offsets we need to make sure we are not subscribed. otherwise we get : 
   
   ```java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752822683



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion);
+                        rebalance = true;

Review comment:
       How about we just take the rebalances for now? We can improve this later




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752680404



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       Okay @ableegoldman and I talked offline about this. I think we found a good solution




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752831258



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion);
+                        rebalance = true;

Review comment:
       You mean just always trigger a rebalance? That SGTM 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#issuecomment-973307507


   @guozhangwang  @ableegoldman I think this is ready for another pass.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750810073



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       Yeah I'm not quite sure these changes are necessary, I thought I had it set up to behave the way we want/need in the last PR I just merged. Of course I could have missed something that had to be added for this PR, but can you explain a bit more why we need these changes? Was this related to trying to fix tests that had broken?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r746136746



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Objects;
+
+public class RemoveNamedTopologyResult {
+    private final KafkaFuture<Void> removeTopologyFuture;
+    private final DeleteConsumerGroupOffsetsResult deleteOffsetsResult;
+
+    public RemoveNamedTopologyResult(final KafkaFuture<Void> removeTopologyFuture, final DeleteConsumerGroupOffsetsResult deleteOffsetsResult) {
+        this.removeTopologyFuture = removeTopologyFuture;
+        this.deleteOffsetsResult = deleteOffsetsResult;

Review comment:
       I don't use the delete offset in this PR. I can remove it from the interface for now but I would rather leave it and just follow up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750825033



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder builder,
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
+        getStreamThreadCount = () -> getNumStreamThreads(config);

Review comment:
       This is because the number of threads can change. If they do we don't want to be stuck if a thread is removed

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) {
         if (isEmpty() && threadState.get().isAlive()) {
             try {
                 lock();
-                while (isEmpty() && threadState.get().isAlive()) {

Review comment:
       now that we have the loop in the caller I think we can simplify 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-

Review comment:
       I think we might be able to now

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       ah yeah I need to clean this up. thanks for the reminder

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       The previous version was not working because when the last topology was removed it would ungate and proceed  to the poll phase and throw an exception. Let me see what I can simplify using @guozhangwang's suggestions and then you can give it another look




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752680014



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder builder,
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
+        getStreamThreadCount = () -> getNumStreamThreads(config);

Review comment:
       removed




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752809604



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion);

Review comment:
       ```suggestion
                           log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+

Review comment:
       super nit: extra line break

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;

Review comment:
       nit: typo in "verison", also since `version` is already taken by the `TopologyVersion` field we should rename either that field or this variable (probably makes sense to rename the field to `topologyVersion` or `currentTopologyVersion` or `latestTopologyVersion`, etc...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       We shouldn't call `get()` on the futures, that's up to the user to do. Technically speaking the point of this isn't to make the APIs blocking, but to enable users to block on the completion of them -- ie, we want to return the `RemoveNamedTopologyResult` ASAP and allow the user to block on it, if they want

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -578,6 +577,7 @@ public void run() {
             failedStreamThreadSensor.record();
             this.streamsUncaughtExceptionHandler.accept(e);
         } finally {
+            topologyMetadata.unregisterThread(threadMetadata.threadName());

Review comment:
       This should be moved to inside the `completeShutdown()` method, since we register the thread inside the StreamThread constructor but it's possible for the thread to be shut down before it ever starts running, in which case we'd still want to make sure it gets unregistered

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {

Review comment:
       ```suggestion
       public Collection<String> sourceTopicsForTopology(final String name) {
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());

Review comment:
       We don't need to do standbys since they don't commit offsets on the input topics (and imo we should not reset offsets on changelogs or repartitions, but leave it up to the user to clean those up how they see fit)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,48 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopologies(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+
+    public boolean reachedLatestVersion(final String threadName) {
+        boolean rebalance = false;
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long verison = topologyVersionWaiters.topologyVersion;
+                if (verison <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= verison)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("thread {} is now on on version {}", threadName, topologyVersionWaiters.topologyVersion);
+                        rebalance = true;

Review comment:
       This is a little tricky, but I think it might make sense to do the reverse. It has to do with how the thread handles being assigned tasks that it doesn't recognize because it hasn't yet updated its topology, which is to just make a note of the assigned task name and then stash it away until we see a topology update, during which we then check if the task was part of this new topology and if so we go on to create it. Which is a long way of saying, it's better to rebalance earlier than later, ie to rebalance when the _first_ thread processes a topology update rather than when the _last_ thread has processed the update. Partly so that the first thread can go ahead and start processing tasks from that topology right away, and partly because if we wait until the last thread acks the topology update then we could get in trouble and end up never rebalancing in extreme cases, for example if a thread gets stuck in processing (like if the user has bad custom logic that sends it into an 
 infinite loop, which is a real example from an escalation I once had)
   
   Happy to chat over Zoom if you want to dig into this a bit more, I thought I had left a more specific TODO comment about when we do/don't want to rebalance but I just read it again and it's wildly unhelpful 😅 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -61,6 +74,8 @@ public KafkaStreamsNamedTopologyWrapper(final Properties props, final KafkaClien
 
     private KafkaStreamsNamedTopologyWrapper(final StreamsConfig config, final KafkaClientSupplier clientSupplier) {
         super(new TopologyMetadata(new ConcurrentSkipListMap<>(), config), config, clientSupplier);
+        final LogContext logContext = new LogContext();

Review comment:
       I think you can just instantiate the field like this instead of going through the `LogContext`, I checked that class and it seems to do the same thing as just
   ``` 
   private final Logger log = LoggerFactory.getLogger(KafkaStreamsNamedTopologyWrapper.class);
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+

Review comment:
       nit: extra line breaks

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();

Review comment:
       super nit:
   ```suggestion
                   final Set<TaskMetadata> tasks = new HashSet<>();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r750795877



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -121,9 +164,9 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) {
         if (isEmpty() && threadState.get().isAlive()) {
             try {
                 lock();
-                while (isEmpty() && threadState.get().isAlive()) {

Review comment:
       See my other comment: I think this function `maybeWaitForNonEmptyTopology` would not be needed any more since it's only value is the `version.topologyCV.await();` part, which we can just move up to the caller's while loop.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       As for the future: I think it's okay to let the first future be waiting a bit more if there are consecutive operations, i.e. in the above implementation we would complete all futures that have been registered so far when we eventually get every still-alive threads to catch up with the bumped version.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-

Review comment:
       For line 915 below: do we still need this line? Could we just inline the `Condition topologyCV` waiting within this while loop now?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       Intentional?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +134,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.error("adding {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.error("Removing {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+//            .filter(t -> topologyMetadata.sourceTopicCollection().contains(t))

Review comment:
       Also this logic seems would just include all partitions of all tasks, since it does not involve the `topologyToRemove` at all?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -83,6 +100,7 @@ public TopologyMetadata(final InternalTopologyBuilder builder,
         } else {
             builders.put(UNNAMED_TOPOLOGY, builder);
         }
+        getStreamThreadCount = () -> getNumStreamThreads(config);

Review comment:
       Why we need to initialize this supplier in the constructor (ditto below)? This variable is only called in `reachedVersion` when stream threads have been initialized already.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-
+        do {
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
+            if (lastSeenTopologyVersion < topologyMetadata.topologyVersion()) {

Review comment:
       I feel this is getting more complicated than necessary to do KAFKA-12648: we have a `topologyMetadata` at the `KafkaStreams` layer while at the `StreamThread` layer we keep a `long lastSeenTopologyVersion`. If we let the `topologyMetadata` object which is shared among all threads (as well as their task managers etc) then we can simplify this. Just a sketchy thought here:
   
   * In `TopologyMetadata`  we maintain a `Map<String, Long>` from thread name to thread's current topology version. When new threads are added / threads are removed, this map would be updated as well (in synchronized way).
   * Inside a thread's `checkForTopologyUpdate`, in a synchronized block check if all threads' versions except this thread is equal to the current version: if yes, this thread would update its corresponding map entry as well and then trigger rebalance; otherwise, only update the corresponding map.
   
   So if consecutive topology updates are being issued, then we would only trigger a rebalance at the end when all threads reaches the end topology version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r756321051



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {

Review comment:
       Sure. I will go with `maybeNotifyTopologyVersionWaiters` We can also go with `updateThreadVersions`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure

Review comment:
       How about I update the javadoc to mention this is not purely async same for add topology?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();

Review comment:
       Sure. We can talk about it more on that ticket: https://confluentinc.atlassian.net/browse/KCI-1250

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       I gave it a shot but it causes test to fail when removing a topology that is not that last one. I will leave a comment

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final Set<TaskMetadata> tasks = new HashSet<>(t.activeTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopicsForTopology(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
 
-        topologyMetadata.unregisterTopology(topologyToRemove);
+        if (resetOffsets) {
+            final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                removeTopologyFuture.whenComplete((v, throwable) -> {
+                    if (throwable != null) {
+                        future.completeExceptionally(throwable);
+                    }
+                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
+                    while (deleteOffsetsResult == null) {

Review comment:
       thats a good point. I will add a check for that. That does work.
   
    in adding that check I found that something interesting. You can't block on one client while never removing the topology on the other, then the rebalance will keep failing and it will block forever.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,104 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure

Review comment:
       I am not sure I understand? If we don't call get on the result it breaks the tests. pretty much blocking is not enough. It might not be purely async but it for sure not blocking. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax merged pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

Posted by GitBox <gi...@apache.org>.
mjsax merged pull request #11479:
URL: https://github.com/apache/kafka/pull/11479


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r766186102



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void maybeNotifyTopologyVersionWaiters(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion;
+                if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion);

Review comment:
       Those are good points. I updated the log and method name in a follow up. I thought about extracting the version update to the caller but I would rather have the coupled so I don't have to worry about them getting out of sync.
   
   https://github.com/apache/kafka/pull/11589




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r746918532



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
##########
@@ -425,6 +428,36 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA));
     }
 
+    @Test
+    public void shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopology() throws Exception {
+        CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
+        // Build up named topology with two stateful subtopologies
+        final KStream<String, Long> inputStream1 = topology1Builder.stream(INPUT_STREAM_1);
+        inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT);
+        inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
+        streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier);
+        streams.start();
+        final NamedTopology namedTopology = topology1Builder.buildNamedTopology(props);
+        streams.addNamedTopology(namedTopology).all().get();
+
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA));
+        streams.removeNamedTopology("topology-1", true).all().get();
+        streams.cleanUpNamedTopology("topology-1");
+
+        final KStream<String, Long> inputStream = topology1BuilderDup.stream(INPUT_STREAM_1);
+        inputStream.groupByKey().count().toStream().to(COUNT_OUTPUT);
+        inputStream.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT);
+
+        final NamedTopology namedTopologyDup = topology1BuilderDup.buildNamedTopology(props);
+        streams.addNamedTopology(namedTopologyDup).all().get();
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA));

Review comment:
       This is getting double counted right now because when removing a Named topology I need to delete any change-log or stores associated with it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r746136746



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.namedtopology;
+
+import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+
+import java.util.Objects;
+
+public class RemoveNamedTopologyResult {
+    private final KafkaFuture<Void> removeTopologyFuture;
+    private final DeleteConsumerGroupOffsetsResult deleteOffsetsResult;
+
+    public RemoveNamedTopologyResult(final KafkaFuture<Void> removeTopologyFuture, final DeleteConsumerGroupOffsetsResult deleteOffsetsResult) {
+        this.removeTopologyFuture = removeTopologyFuture;
+        this.deleteOffsetsResult = deleteOffsetsResult;

Review comment:
       I don't use the delete offset in this PR. I can remove it from the interface for now but I would rather leave it and just follow up




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r754719711



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);

Review comment:
       Does the version starts at 0 or starts at 1? If it start at 0 is it possible that a newly added thread would not get notified of the initial version 0?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,14 +907,16 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
+        if (topologyMetadata.isEmpty() || topologyMetadata.needsUpdate(getName())) {
             taskManager.handleTopologyUpdates();
+            log.info("StreamThread has detected an update to the topology, triggering a rebalance to refresh the assignment");
+            if (topologyMetadata.isEmpty()) {

Review comment:
       Why we add this if condition? I cannot read the motivations here..

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void reachedLatestVersion(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();

Review comment:
       Correct me if I think crazily here :P My read is that the topology version of thread can ONLY be incremented (maybe by more than 1), but never decremented. So we actually do not need to remember what are the "waiting" versions as a list, instead we can just keep a list of futures for each thread, and when a thread has successfully `handleTopologyUpdates`, we can immediately complete all the currently maintained futures since there will be no futures that can ever be related to a newer version that this thread has just updated to.
   
   With that, we can 1) get rid of the `topologyVersionWaiters`, 2) do not need the nest loop of while + stream() below fo complete futures. Instead we just keep a list of futures without any additional futures associated with them per thread.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -135,31 +189,42 @@ public void maybeWaitForNonEmptyTopology(final Supplier<State> threadState) {
         }
     }
 
-    public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) {
+    /**
+     * Adds the topology and registers a future that listens for all threads on the older version to see the update
+     */
+    public KafkaFuture<Void> registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) {
+        final KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
         try {
             lock();
             version.topologyVersion.incrementAndGet();
             log.info("Adding NamedTopology {}, latest topology version is {}", newTopologyBuilder.topologyName(), version.topologyVersion.get());
+            version.activeTopologyWaiters.add(new TopologyVersionWaiters(topologyVersion(), future));

Review comment:
       Please see my other comment: instead of keeping a single list of `topologyVersionWaiters`, could we just keep a list of futures per thread, and once updated immediately complete all of them?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752821836



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       unfortunately if we want to reset the offsets we need to make sure we are not subscribed. otherwise we get : 
   
   java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupSubscribedToTopicException: Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies have a blocking option

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r765364839



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
##########
@@ -108,6 +124,44 @@ private void unlock() {
         version.topologyLock.unlock();
     }
 
+    public Collection<String> sourceTopicsForTopology(final String name) {
+        return builders.get(name).sourceTopicCollection();
+    }
+
+    public boolean needsUpdate(final String threadName) {
+        return threadVersions.get(threadName) < topologyVersion();
+    }
+
+    public void registerThread(final String threadName) {
+        threadVersions.put(threadName, 0L);
+    }
+
+    public void unregisterThread(final String threadName) {
+        threadVersions.remove(threadName);
+    }
+
+    public void maybeNotifyTopologyVersionWaiters(final String threadName) {
+        try {
+            lock();
+            final Iterator<TopologyVersionWaiters> iterator = version.activeTopologyWaiters.listIterator();
+            TopologyVersionWaiters topologyVersionWaiters;
+            threadVersions.put(threadName, topologyVersion());
+            while (iterator.hasNext()) {
+                topologyVersionWaiters = iterator.next();
+                final long topologyVersionWaitersVersion = topologyVersionWaiters.topologyVersion;
+                if (topologyVersionWaitersVersion <= threadVersions.get(threadName)) {
+                    if (threadVersions.values().stream().allMatch(t -> t >= topologyVersionWaitersVersion)) {
+                        topologyVersionWaiters.future.complete(null);
+                        iterator.remove();
+                        log.info("Thread {} is now on topology version {}", threadName, topologyVersionWaiters.topologyVersion);

Review comment:
       Sorry for getting some late comments after the PR is merged, but I feel this log entry could be confusing: we always update the thread version as in line 148, but we only log it after ALL threads have been updated beyond that thread. This means the log entry may be printed later than it actually happens.
   
   Another minor thing is that this function actually does two things: update the thread version, and then maybe-complete-version-waiters, the function name though only indicates the latter but not the former. Maybe it's better to extract line 148 long with line 156 into the caller of this function? Or we could rename the function as `updateThreadVersionAndMayBe...`. Personally I prefer the first option. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752681064



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -908,20 +908,25 @@ private void initializeAndRestorePhase() {
 
     // Check if the topology has been updated since we last checked, ie via #addNamedTopology or #removeNamedTopology
     private void checkForTopologyUpdates() {
-        if (lastSeenTopologyVersion < topologyMetadata.topologyVersion() || topologyMetadata.isEmpty()) {
-            lastSeenTopologyVersion = topologyMetadata.topologyVersion();
-            taskManager.handleTopologyUpdates();
-

Review comment:
       I ended up not doing this to avoid exposing locks without need. We did get rid of one of the loops so I hope that helps




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r752831095



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       Hm, I see. Ok that's potentially going to be a bit of an issue with multi-node clusters...let's chat about this tomorrow, I have some thoughts on a few different approaches we might want to consider here.
   
   That said, we still should not call `get()` here, can we just move the offset reset into `TopologyMetadata` and do it after the last thread has processed the topology removal? I know it's similar but I really think we should make sure to return from `removeNamedTopology()` ASAP so that it's async and the caller can in theory do some other stuff if they wanted to before blocking on the result




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r753367564



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       how about something like this:
   `        if (resetOffsets) {
               KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
               log.info("partitions to reset: {}", partitionsToReset);
               if (!partitionsToReset.isEmpty()) {
                   removeTopologyFuture.whenComplete( (v, e) -> {
                       DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
                       while (deleteOffsetsResult == null) {
                           try {
                               deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
                                   applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
                               deleteOffsetsResult.all().get();
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                               break;
                           } catch (ExecutionException ex) {
                               ex.printStackTrace();
                           }
                           try {
                               Thread.sleep(100);
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                           }
                       }
                       future.complete(null);
                       });
                   return new RemoveNamedTopologyResult(removeTopologyFuture, future);
               }
           }`

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       how about something like this:
   ` ```      if (resetOffsets) {
               KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
               log.info("partitions to reset: {}", partitionsToReset);
               if (!partitionsToReset.isEmpty()) {
                   removeTopologyFuture.whenComplete( (v, e) -> {
                       DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
                       while (deleteOffsetsResult == null) {
                           try {
                               deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
                                   applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
                               deleteOffsetsResult.all().get();
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                               break;
                           } catch (ExecutionException ex) {
                               ex.printStackTrace();
                           }
                           try {
                               Thread.sleep(100);
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                           }
                       }
                       future.complete(null);
                       });
                   return new RemoveNamedTopologyResult(removeTopologyFuture, future);
               }
           }```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] wcarlson5 commented on a change in pull request #11479: KAFKA-12648: Make changing the named topologies blocking

Posted by GitBox <gi...@apache.org>.
wcarlson5 commented on a change in pull request #11479:
URL: https://github.com/apache/kafka/pull/11479#discussion_r753367564



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -118,40 +133,90 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) {
 
     /**
      * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running,
-     * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for
+     * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for
      * it to begin processing the new topology.
      *
      * @throws IllegalArgumentException if this topology name is already in use
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void addNamedTopology(final NamedTopology newTopology) {
+    public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) {
+        log.debug("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
             throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() +
                                                    " as another of the same name already exists");
         }
-        topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
+        return new AddNamedTopologyResult(
+            topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder())
+        );
     }
 
     /**
      * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are
-     * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure
+     * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure
      * it stops processing the old topology.
      *
+     * @param topologyToRemove          name of the topology to be removed
+     * @param resetOffsets              whether to reset the committed offsets for any source topics
+     *
      * @throws IllegalArgumentException if this topology name cannot be found
      * @throws IllegalStateException    if streams has not been started or has already shut down
      * @throws TopologyException        if this topology subscribes to any input topics or pattern already in use
      */
-    public void removeNamedTopology(final String topologyToRemove) {
+    public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) {
+        log.debug("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
             throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove);
         }
+        final Set<TopicPartition> partitionsToReset = metadataForLocalThreads()
+            .stream()
+            .flatMap(t -> {
+                final HashSet<TaskMetadata> tasks = new HashSet<>();
+                tasks.addAll(t.activeTasks());
+                tasks.addAll(t.standbyTasks());
+                return tasks.stream();
+            })
+            .flatMap(t -> t.topicPartitions().stream())
+            .filter(t -> topologyMetadata.sourceTopologies(topologyToRemove).contains(t.topic()))
+            .collect(Collectors.toSet());
+
+
+
+        final KafkaFuture<Void> removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove);
+
+        if (resetOffsets) {
+            log.info("partitions to reset: {}", partitionsToReset);
+            if (!partitionsToReset.isEmpty()) {
+                try {
+                    removeTopologyFuture.get();

Review comment:
       How about something like this?
   ```
           if (resetOffsets) {
               KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
               log.info("partitions to reset: {}", partitionsToReset);
               if (!partitionsToReset.isEmpty()) {
                   removeTopologyFuture.whenComplete( (v, e) -> {
                       DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
                       while (deleteOffsetsResult == null) {
                           try {
                               deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
                                   applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
                               deleteOffsetsResult.all().get();
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                               break;
                           } catch (ExecutionException ex) {
                               ex.printStackTrace();
                           }
                           try {
                               Thread.sleep(100);
                           } catch (InterruptedException ex) {
                               ex.printStackTrace();
                           }
                       }
                       future.complete(null);
                       });
                   return new RemoveNamedTopologyResult(removeTopologyFuture, future);
               }
           }
   ```   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org