You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/24 00:20:54 UTC

[GitHub] [kafka] mjsax commented on a change in pull request #8540: KAFKA-9127: don't create StreamThreads for global-only topology

mjsax commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414205108



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");

Review comment:
       I don't think this should be WARN but an INFO. There is nothing "wrong" and thus nothing to warn about?

##########
File path: checkstyle/suppressions.xml
##########
@@ -181,7 +181,7 @@
               files="StreamsPartitionAssignor.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(ProcessorStateManager|InternalTopologyBuilder|StreamsPartitionAssignor|StreamThread|TaskManager|AssignorConfiguration).java"/>
+              files="(AssignorConfiguration|InternalTopologyBuilder|KafkaStreams|ProcessorStateManager|StreamsPartitionAssignor|StreamThread|TaskManager).java"/>

Review comment:
       Do we really need to add this exception? How much work would it be to reduce the complexity of `KafkaStreams`?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder internalTopologyBuilder,
                 internalTopologyBuilder,
                 parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global table");
+            throw new IllegalArgumentException("Topology has no stream threads and no global threads");

Review comment:
       Do we really want to do this? I understand that and empty topology does not make sense, and it would be appropriate to log a WARN -- but do we need/want to reject it?
   
   Also, should we instead throw an `InvalidTopologyException`? Furthermore, should we add a similar check to `StreamsBuilder.builder()` to raise this error even earlier (we would still nee this check though).

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new KafkaStreams(new StreamsBuilder().build(), props, supplier, time)
+        );
+    }
+
+    @Test
+    public void shouldNotCreateStreamThreadsForGlobalOnlyTopology() {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+
+        assertThat(streams.threads.length, equalTo(0));
+    }
+
+    @Test
+    public void shouldNotTransitToErrorStateWithGlobalOnlyTopology() throws InterruptedException {
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.globalTable("anyTopic");
+        final KafkaStreams streams = new KafkaStreams(builder.build(), props, supplier, time);
+        streams.setStateListener((newState, oldState) -> {
+            if (newState.equals(State.ERROR)) {
+                throw new AssertionError("Should not have transitioned to ERROR state with no stream threads");

Review comment:
       Is the state listener executed on the correct thread (ie, main testing thread?) that thus would really make the test fail? From my current understanding it would be called by the `GlobalThread` and thus the global thread would die instead (this would lead to a timeout below I guess?)

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -327,15 +331,15 @@ private void prepareStreamThread(final StreamThread thread, final boolean termin
 
     @Test
     public void testShouldTransitToNotRunningIfCloseRightAfterCreated() {
-        final KafkaStreams streams = new KafkaStreams(new StreamsBuilder().build(), props, supplier, time);
+        final KafkaStreams streams = new KafkaStreams(getBuilderWithSource().build(), props, supplier, time);

Review comment:
       The change to reject an empty topology make our testing rather "ugly"... Do we really _need_ to reject it?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -883,6 +887,50 @@ public void statefulTopologyShouldCreateStateDirectory() throws Exception {
         startStreamsAndCheckDirExists(topology, true);
     }
 
+    @Test
+    public void shouldThrowIllegalArgumentExceptionOnEmptyTopology() {
+        assertThrows(

Review comment:
       IMHO, we should always get the exception and assert on the error message (otherwise, it exception might be throw because of. different reason the the test does not test what it is support to test -- we have seen this issue in the past).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org