You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/02/26 14:25:14 UTC

[GitHub] [kafka] wcarlson5 commented on a change in pull request #11813: KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED

wcarlson5 commented on a change in pull request #11813:
URL: https://github.com/apache/kafka/pull/11813#discussion_r815315400



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo
 
         topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove);
 
-        if (resetOffsets) {
+        if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) {
             log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}",
                      removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully",
                      topologyToRemove, partitionsToReset
             );
-            if (!partitionsToReset.isEmpty()) {
-                removeTopologyFuture.whenComplete((v, throwable) -> {
-                    if (throwable != null) {
-                        removeTopologyFuture.completeExceptionally(throwable);
-                    }
-                    DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-                    while (deleteOffsetsResult == null) {
-                        try {
-                            deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(
-                                applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset);
-                            deleteOffsetsResult.all().get();
-                        } catch (final InterruptedException ex) {
+            resetOffsets(removeTopologyFuture, partitionsToReset);

Review comment:
       I think we need to do something the the result of the `resetOffsets` method?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java
##########
@@ -124,6 +124,17 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() {
         streams.start(asList(builder1.build(), builder2.build()));
     }
 
+    @Test
+    public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception {

Review comment:
       This is a good test. But there is one more that I would like to see. It may have to be an integration test though. 
   
   If we add two topologies that have overlapping source topics to an un-started client, will we see an error? Or will that error not be discovered until after the streams object is started? I think it will be the later, however I think we should document this behavior and have a test for it so that we know when it changes. I could see this becoming a source of much confusion later on.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org