You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/23 21:25:39 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

ableegoldman opened a new pull request #8540:
URL: https://github.com/apache/kafka/pull/8540


   This should fix the flaky `GlobalKTableIntegrationTest.shouldRestoreGlobalInMemoryKTableOnRestart` as well as address the issue on the ticket (avoid unnecessary group coordination overhead).
   
   The deeper issue (and root cause of the test's flakiness) is that currently every StreamThread in a global-only topology will hit IllegalStateException upon trying to poll. Once they all died, the KafkaStreams instance goes into the ERROR state despite the global thread being alive and well. 
   
   The fix is to check whether the topology is global-only and overwrite the num.threads to 0 in that case. This PR also adds a check for topologies with nothing whatsoever, and throws an exception to fail fast


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cemo commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
cemo commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-634010610


   @mjarvie when this fix will be released? 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623697876


   Thanks for the clarification. I'll see about backporting to 2.4...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623701299


   Actually @ableegoldman , the cherry-pick to 2.4 isn't straightforward. Do you mind doing a separate PR for it?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax edited a comment on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax edited a comment on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-622513778


   @vvcephei Cherry-picking to 2.4 did not work? There might still be a 2.4.2 release? Or should we not back-port it to 2.4 because in 2.4 one can in fact run an empty topology?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822406



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       I guess I can't personally imagine any reason to ever want an app running with an empty topology, and would prefer to be notified immediately since I presumably did something wrong. But if you feel strongly about allowing this I can demote this to a warning




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414201107



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -161,6 +165,9 @@ public void before() throws Exception {
         props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        builder = new StreamsBuilder();
+        builder.stream("source");

Review comment:
       Good call, done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620359124


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623690181


   @vvcephei yeah the regression was only introduced in 2.5, so we only _need_ to backport it that far. But I actually would agree that we should backport it to 2.4 as well: in a global-only topology you have unnecessary group coordination overhead of any stream threads resulting in unnecessary rebalances, network usage, etc. Of course you can set the number of threads to zero to get around this, but the KafkaStreams state would not reach RUNNING. That's one of the fixes in this PR, which actually is worth backporting 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623680367


   Thanks @mjsax  (and sorry I didn't see your comment before).
   
   The ticket indicates that it was introduced in 2.5 (https://issues.apache.org/jira/browse/KAFKA-9127).
   
   IIUC, in 2.4, we would needlessly create StreamThreads, but we would still be able to process global-only topologies. It was only in 2.5 that we actually lost the ability to process global-only topologies.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       ~~Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type~~ 
   edit: Nevermind it's just `TopologyException`, found it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r418681001



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       Thanks for clarification.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620973163


   Cherry-pick for 2.5 in progress...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414832679



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       Would a class-level `final StreamsBuilder builder = new StreamsBuilder()` that we add a source to in the setUp be any. better iyo?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414823010



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       Is `getBuilderWithSource` really that much uglier than `new StreamsBuilder`? :P 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       ~~Also, do we have an `InvalidTopologyException` or similar exception already? Or were you proposing to add a new type~~ 
   edit: Nevermind, found it. It's just `TopologyException` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414819765



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)
+        );
+    }
+
+    @Test
+    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+        assertThat(streams.threads.length, equalTo(0));
+    }
+
+    @Test
+    public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+        streams.setStateListener((newState, oldState) -> {
+            if (newState.equals(State.ERROR)) {
+                throw new AssertionError("Should not have transitioned to ERROR state with no stream threads");

Review comment:
       I guess we don't need to throw here, it would just cause KafkaStreams to transition to ERROR and fail below. But I realized this doesn't even do that because we're mocking pretty much everything in this test class including the stream threads. I'll try to look for a better way and place to do this test




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846265






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cemo commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
cemo commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-634888437


   @mjsax I was actually referring 2.5.1. 😇 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-622513778


   @vvcephei Cherry-picking to 2.4 did not work? There might still be a 2.4.2 releae.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-624368767


   Thanks for back porting @ableegoldman!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620909428


   One unrelated failure: `MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620846462






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623705234


   Or, if it's too much trouble, we can just note on the ticket that it _could_ be backported, but isn't (yet)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414214864



##########
File path: checkstyle/suppressions.xml
##########
@@ -181,7 +181,7 @@
               files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|TaskManager|AssignorConfiguration).java"/>
+              files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

Review comment:
       NPathComplexity is a tough one to work around. We'd wind up having to move some blocks of logic to separate helper classes.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414825387



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(

Review comment:
       Ack




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414923007



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       But @vvcephei 's last question gets right to the heart of the matter. The answer being " technically yes, but it will crash if you try to poll for said nothing, so really no"
   That's why the test was flaky, and the reason for this PR in the first place (Avoiding group overhead is the name of the ticket, but the reality is it will only happen once before all the StreamThreads die due to polling no topics)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414182534



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -161,6 +165,9 @@ public void before() throws Exception {
         props.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, NUM_THREADS);
 
+        builder = new StreamsBuilder();
+        builder.stream("source");

Review comment:
       Can you create a method to `getBuilderWithSource()` and just call it inline instead of creating a new mutable field for all the tests to rely on?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei edited a comment on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei edited a comment on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-623680367


   Thanks @mjsax  (and sorry I didn't see your comment before).
   
   The ticket indicates that it was introduced in 2.5 (https://issues.apache.org/jira/browse/KAFKA-9127 , link for convenience).
   
   IIUC, in 2.4, we would needlessly create StreamThreads, but we would still be able to process global-only topologies. It was only in 2.5 that we actually lost the ability to process global-only topologies.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620359273


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414916950



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       I think the suggestion was more along the lines of _not_ throwing an exception while building an empty topology.
   
   I'm not sure. It seems kind of nice to find out right away that your program will do absolutely nothing. I'm not totally sure you could really run an empty topology. Can you subscribe a Consumer to "no topics"?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r416215513



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
##########
@@ -265,12 +270,27 @@ public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
         kafkaStreams.close();
 
         startStreams();
+
         store = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.keyValueStore()));
         assertThat(store.approximateNumEntries(), equalTo(4L));
         timestampedStore = kafkaStreams.store(StoreQueryParameters.fromNameAndType(globalStore, QueryableStoreTypes.timestampedKeyValueStore()));
         assertThat(timestampedStore.approximateNumEntries(), equalTo(4L));
     }
 
+    @Test
+    public void shouldGetToRunningWithOnlyGlobalTopology() throws Exception {

Review comment:
       Copied this over to trunk and verified that it fails consistently




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620359206


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-620976620


   Cherry-picked to 2.5 as 9e2785fd1ba0ed16604e01058bae6b60ff9f3d96


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414922504



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       I realize he wasn't just complaining about the name, but I was trying to keep that discussion in one thread. But I guess you can only do so much to keep PR chatter oriented in one place 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-634881345


   As you can see from the ticket (https://issues.apache.org/jira/browse/KAFKA-9127) the fix will be included in `2.4.2`, `2.5.1` and `2.6.0`. There is only a timeline for `2.6.0` atm with a target release date of June 24, 2020: https://cwiki.apache.org/confluence/display/KAFKA/Future+release+plan


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mjsax commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
mjsax commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414205108



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");

Review comment:
       I don't think this should be WARN but an INFO. There is nothing "wrong" and thus nothing to warn about?

##########
File path: checkstyle/suppressions.xml
##########
@@ -181,7 +181,7 @@
               files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|TaskManager|AssignorConfiguration).java"/>
+              files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

Review comment:
       Do we really need to add this exception? How much work would it be to reduce the complexity of `KafkaStreams`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       Do we really want to do this? I understand that and empty topology does not make sense, and it would be appropriate to log a WARN -- but do we need/want to reject it?
   
   Also, should we instead throw an `InvalidTopologyException`? Furthermore, should we add a similar check to `StreamsBuilder.builder()` to raise this error even earlier (we would still nee this check though).

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)
+        );
+    }
+
+    @Test
+    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+        assertThat(streams.threads.length, equalTo(0));
+    }
+
+    @Test
+    public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+        streams.setStateListener((newState, oldState) -> {
+            if (newState.equals(State.ERROR)) {
+                throw new AssertionError("Should not have transitioned to ERROR state with no stream threads");

Review comment:
       Is the state listener executed on the correct thread (ie, main testing thread?) that thus would really make the test fail? From my current understanding it would be called by the `GlobalThread` and thus the global thread would die instead (this would lead to a timeout below I guess?)

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       The change to reject an empty topology make our testing rather "ugly"... Do we really _need_ to reject it?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(

Review comment:
       IMHO, we should always get the exception and assert on the error message (otherwise, it exception might be throw because of. different reason the the test does not test what it is support to test -- we have seen this issue in the past).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414916950



##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       I think the suggestion was more along the lines of _not_ throwing an exception while building an empty topology.
   
   I'm not sure. It seems kind of nice to find out right away that your program will do absolutely nothing.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#issuecomment-618729924


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org