You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "cadonna (via GitHub)" <gi...@apache.org> on 2023/06/28 16:35:28 UTC

[GitHub] [kafka] cadonna opened a new pull request, #13927: KAFKA-10199: Enable state updater by default

cadonna opened a new pull request, #13927:
URL: https://github.com/apache/kafka/pull/13927

   Now that the implementation for the state updater is done, we can enable it by default.
   
   This PR enables the state updater by default and fixes code that made assumptions that are not true when the state updater is enabled (mainly tests).
   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246556477


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1138,28 +1140,35 @@ public void signalResume() {
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        // Not all tasks will create directories, and there may be directories for tasks we don't currently own,
-        // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
-        // just have an empty changelogOffsets map.
-        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) {
-            final Task task = tasks.contains(id) ? tasks.task(id) : null;
-            // Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint
-            if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) {
+        final Map<TaskId, Task> tasks = allTasks();
+        final Set<TaskId> createdAndClosedTasks = new HashSet<>();
+        for (final Task task : tasks.values()) {
+            if (task.state() != State.CREATED && task.state() != State.CLOSED) {
                 final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets();
                 if (changelogOffsets.isEmpty()) {
-                    log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id);
+                    log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}",
+                        task.id());
                 } else {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets));
+                    taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets));
                 }
             } else {
-                final File checkpointFile = stateDirectory.checkpointFileFor(id);
-                try {
-                    if (checkpointFile.exists()) {
-                        taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
-                    }
-                } catch (final IOException e) {
-                    log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
+                createdAndClosedTasks.add(task.id());
+            }
+        }
+
+        // Not all tasks will create directories, and there may be directories for tasks we don't currently own,
+        // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should
+        // just have an empty changelogOffsets map.
+        final Set<TaskId> lockedTaskDirectoriesOfNonOwnedTasks = new HashSet<>(lockedTaskDirectories);
+        lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet());
+        for (final TaskId id : union(HashSet::new, lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) {
+            final File checkpointFile = stateDirectory.checkpointFileFor(id);
+            try {
+                if (checkpointFile.exists()) {
+                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read()));
                 }
+            } catch (final IOException e) {
+                log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e);
             }
         }

Review Comment:
   This is the fix from https://github.com/apache/kafka/pull/13925. This will disappear once the fix is merged and this PR rebased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1256076934


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1619,7 +1639,14 @@ private List<Task> standbyTaskIterable() {
     }
 
     private Stream<Task> standbyTaskStream() {
-        return tasks.allTasks().stream().filter(t -> !t.isActive());
+        if (stateUpdater != null) {
+            return Stream.concat(
+                stateUpdater.getStandbyTasks().stream(),
+                tasks.allTasks().stream().filter(t -> !t.isActive())
+            );
+        } else {
+            return tasks.allTasks().stream().filter(t -> !t.isActive());

Review Comment:
   This is the same line right above it. Can we simplify them?



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   I don't think that output should have changed. That is worrying



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -571,7 +571,9 @@ private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final Map<TaskId,
         while (iter.hasNext()) {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
-            if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+            final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)

Review Comment:
   If we are going to just invert this in the next line and not use it anywhere else can we move that inversion here?



##########
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##########
@@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
             });
         }
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+        for (final KafkaStreams stream: kafkaStreamsList) {
+            stream.start();
+        }

Review Comment:
   That makes sense, but do we still need the synchronization point or even the timeout here?
   
   What I'm asking is is maybe we have a  `waitForApplicationState` for running after calling start for them all



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {
+        return activeRunningTaskStream().collect(Collectors.toList());
+    }
+
     private Stream<Task> activeTaskStream() {
+        if (stateUpdater != null) {
+            return Stream.concat(
+                activeRunningTaskStream(),
+                stateUpdater.getTasks().stream().filter(Task::isActive)
+            );
+        }

Review Comment:
   Should the state updater have active tasks at all? If so who owns them?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   I turned off the caching because with the state updater the results were different than without the state updater. The differences were only there due to caching. I think the processing and commit times differ when the state updater is on or off. 
   If caching is not needed, I think turning it off makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on PR #13927:
URL: https://github.com/apache/kafka/pull/13927#issuecomment-1620617702

   Looking good from my side, pending a review from @wcarlson5 or @ableegoldman w.r.t. the caching question.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1254284586


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -87,7 +88,7 @@ public static void closeCluster() {
     }
 
 
-    private final Time time = CLUSTER.time;
+    private final Time time = new MockTime(1);

Review Comment:
   I tried to expose the `autoTickMs` parameter in embedded kafka, but I ran into a infinite loop.
   
   Regarding your question:
   
   > Are we not worried that broker time and our time might diverge?
   
   In almost all our integration tests broker time diverges from streams time since the broker time does not advance but we usually use system time in `KafkaStreams`. This is not something this PR introduces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246555106


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -846,7 +853,7 @@ void runOnce() {
 
                     if (log.isDebugEnabled()) {
                         log.debug("Committed all active tasks {} and standby tasks {} in {}ms",
-                            taskManager.activeTaskIds(), taskManager.standbyTaskIds(), commitLatency);
+                            taskManager.activeRunningTaskIds(), taskManager.standbyTaskIds(), commitLatency);

Review Comment:
   This is basically a renaming to make the code clearer. With the state updater restoring tasks are owned by the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246696265


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3038,6 +3149,7 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
     @Test
     public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
         final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   I thought the code is better readable if I leave it in the corresponding with-state-updater test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246602718


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {

Review Comment:
   `RUNNING` is the state in which an active task is when it is processing. 
   https://github.com/apache/kafka/blob/750a3893081fbe1e01f8c6eadf84726ace1eb47a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java#L44



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052


##########
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##########
@@ -843,15 +846,19 @@ private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(final Pro
                 // still need to check that for each key, the ordering is expected
                 final Map<K, List<T>> finalAccumData = new HashMap<>();
                 for (final T kv : accumulatedActual) {
-                    finalAccumData.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }
                 }
                 final Map<K, List<T>> finalExpected = new HashMap<>();
                 for (final T kv : expectedRecords) {
-                    finalExpected.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = finalExpected.computeIfAbsent(key, k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }

Review Comment:
   These changes do not consider duplicate record during the comparison. A test that used this verification triggered a failure and verified then that the expected records were in the output topic. However, without the state updater no records were written to the output topic before the failure. With the state updater some records were written to the output topics.
   From a correctness point of view, both is correct since no commit happens before the simulated failure and so Streams reads again all input records after the failover.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1299182368


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   Merged https://github.com/apache/kafka/pull/14227



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1257910205


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -571,7 +571,9 @@ private Map<TaskId, Set<TopicPartition>> pendingTasksToCreate(final Map<TaskId,
         while (iter.hasNext()) {
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
-            if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+            final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)

Review Comment:
   The idea was to make the code more readable since the condition is quite complex but it actually just verifies that a task is owned. I prefer to use an additional variable or method rather than use complex logical conditions that give the reader a hard time. So, I would rather keep the variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246557140


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1542,12 +1551,20 @@ Set<TaskId> activeTaskIds() {
             .collect(Collectors.toSet());
     }
 
-    Set<TaskId> standbyTaskIds() {
-        return standbyTaskStream()
+    Set<TaskId> activeRunningTaskIds() {
+        return activeRunningTaskStream()
             .map(Task::id)
             .collect(Collectors.toSet());
     }
 
+    Set<TaskId> standbyTaskIds() {
+        if (stateUpdater != null) {
+            return stateUpdater.getStandbyTasks().stream().map(Task::id).collect(Collectors.toSet());
+        } else {
+            return standbyTaskStream().map(Task::id).collect(Collectors.toSet());
+        }
+    }

Review Comment:
   With the state updater enabled, standby tasks are owned by the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1292848127


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   In the builds on Jenkins, this test failed because a future of a delete record call did not complete before all offsets in the repartition topic got committed. When the delete call finally completes there a no commits that trigger a new delete record call. That means, if the size of the repartition topic after the delete call that did not complete before all commits is still larger than `PURGE_SEGMENT_BYTES`, the tests fails.
   So increasing the limit in the test makes it more likely that the test succeeds and at the same time it verifies that most of the segments are deleted.
   Alternatively, we could decouple deleting records in the repartition topic from committing. We would need to remove the condition `committed > 0` from this code line: 
   https://github.com/apache/kafka/blob/56ab2f80343d7d842c1b0e3069833106d4fe3259/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1117
   We do not need to wait for a commit to delete records in the repartition topic, because if the async call to delete records takes a while to complete, there might be offsets to delete independently of the next commit. All commits since the execution of the delete record call might also have committed offsets that still need to be deleted from the repartition topic. Such a decoupling implies an iteration over all tasks of a stream thread even if no offsets were committed, but it does not imply an additional remote call to delete the records since if no records need to be deleted no remote call is executed.
   AFAIK, Kafka Streams only guarantees that records are deleted eventually from the repartition topic, so I opted for the change in the test instead of the change in the hot path of the stream thread.
   I tested both options and both options seem to work.
   I am not sure why this flakiness was surfaced by enabling the state updater, since it could theoretically also happen with disabled state updater. Maybe the additional state updater thread limits the resources available to the embedded Kafka. But also that seems unlikely since the state updater thread should be waiting on a condition variable at that stage in the test.         
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246547471


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -201,7 +201,7 @@ public boolean commitNeeded() {
 
     @Override
     public StateStore getStore(final String name) {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.getStore(name);

Review Comment:
   Needed in some tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen with different contents of the cache than without the state updater.
   If caching is not needed, I think turning it off makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
             }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
             // Kill instance, delete state to force restoration.
-            assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60)));
+            assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60)));
             IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
+        } finally {
+            streams.get().close();

Review Comment:
   Look at line 284. There `streams` is used in a lambda expression. You can only use variables that are final or effective final in a lambda expression. There are two possibilities to make `streams` final: either we use a final `AtomicReference` or we assign `streams` to an effective final variable and use that in the lambda. I chose the `AtomicReference` option. This are the two option because if we do not initialize the variable outside of the `try`-clause because we need an initialized variable in the `finally`-clause. 
   However, I now realized that we can create the Streams client outside the `try`-clause because if the constructor fails we do not need to call close and cleanup in the `finally`-clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1253710146


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -87,7 +88,7 @@ public static void closeCluster() {
     }
 
 
-    private final Time time = CLUSTER.time;
+    private final Time time = new MockTime(1);

Review Comment:
   Are we not worried that broker time and our time might diverge? Can we make the broker mock time advance? Seems to be an undesired pattern to have two mock-times?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1253708580


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
         t1.toStream().to(outputTopicName);
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);

Review Comment:
   We need to `close()` a `KafkaStreams` client even if we never `.start()` it, and thus should use `try-with-resource` or split this up and do:
   ```
   final KafkaStreams streams;
   try {
     streams = new KafkaStreams(...);
   } finally {
     ...
   }
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -313,9 +313,14 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
+        } finally {
+            streams.close();
+            streams.cleanUp();
+        }
 
-            // wait till the lag goes down to 0
-            final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);
+        // wait till the lag goes down to 0
+        final KafkaStreams restartedStreams = new KafkaStreams(builder.build(), props);

Review Comment:
   as above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1294331015


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   It is prevented here: https://github.com/apache/kafka/blob/7782741262c08e5735f7c8e09727ec37cb5f7f02/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java#L1782
   
   I do not know why only one delete call should be in-flight at the same time. Maybe to limit the load of admin calls on the brokers?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1294820451


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   Yeah, that makes sense. Seems to me that the overhead wouldn't be noticable if we execute the code even if we didn't commit anything. And it would make the execution of streams slightly less surprising, so, that's what we are going for right? I vote for purging no matter what.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
             }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
             // Kill instance, delete state to force restoration.
-            assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60)));
+            assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60)));
             IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
+        } finally {
+            streams.get().close();

Review Comment:
   Look at line 284. There `streams` is used in a lambda expression. You can only use variables that are final or effective final in a lambda expression. There are two possibilities to make `streams` final: either we use a final `AtomicReference` or we assign `streams` to an effective final variable and use that in the lambda. I chose the `AtomicReference` option. This are the two options because we need to initialize the variable outside of the `try`-clause to have an initialized variable in the `finally`-clause. 
   However, I now realized that we can create the Streams client outside the `try`-clause because if the constructor fails we do not need to call close and cleanup in the `finally`-clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546787


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
-        final boolean stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
+        final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+                config.originals(),
+                InternalConfig.STATE_UPDATER_ENABLED,
+                InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   Enabling the state updater by default.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -551,7 +554,11 @@ public StreamThread(final Time time,
 
         this.numIterations = 1;
         this.eosEnabled = eosEnabled(config);
-        this.stateUpdaterEnabled = InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
+        this.stateUpdaterEnabled = InternalConfig.getBoolean(
+                config.originals(),
+                InternalConfig.STATE_UPDATER_ENABLED,
+                InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546045


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1171,6 +1171,7 @@ public static class InternalConfig {
 
         // Private API to enable the state updater (i.e. state updating on a dedicated thread)
         public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__";
+        public static final boolean STATE_UPDATER_ENABLED_DEFAULT = true;

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1293901249


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   @lucasbru @mjsax @wcarlson5 @aliehsaeedii Could you please sanity check this single change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Re: [PR] KAFKA-10199: Enable state updater by default [kafka]

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna merged PR #13927:
URL: https://github.com/apache/kafka/pull/13927


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen and different points in time than without the state updater. Thus, caching behaves differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052


##########
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##########
@@ -843,15 +846,19 @@ private static <K, V, T> List<T> waitUntilFinalKeyValueRecordsReceived(final Pro
                 // still need to check that for each key, the ordering is expected
                 final Map<K, List<T>> finalAccumData = new HashMap<>();
                 for (final T kv : accumulatedActual) {
-                    finalAccumData.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }
                 }
                 final Map<K, List<T>> finalExpected = new HashMap<>();
                 for (final T kv : expectedRecords) {
-                    finalExpected.computeIfAbsent(
-                        withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key,
-                        key -> new ArrayList<>()).add(kv);
+                    final K key = withTimestamp ? ((KeyValueTimestamp<K, V>) kv).key() : ((KeyValue<K, V>) kv).key;
+                    final List<T> records = finalExpected.computeIfAbsent(key, k -> new ArrayList<>());
+                    if (!records.contains(kv)) {
+                        records.add(kv);
+                    }

Review Comment:
   These changes do not consider duplicate record during the comparison. A test that used this verification triggered a failure and verified then that the expected records were in the output topic. However, without the state updater no records were written to the output topic before the failure. With the state updater some records were written to the output topics.
   From a correctness point of view, both is correct since no commit happens before the simulated failure and so Streams reads again all input records after the failover (under ALOS).   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546435


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -230,8 +230,11 @@ public StoreChangelogReader(final Time time,
         this.restoreConsumer = restoreConsumer;
         this.stateRestoreListener = stateRestoreListener;
 
-        this.stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
+        this.stateUpdaterEnabled = InternalConfig.getBoolean(
+            config.originals(),
+            InternalConfig.STATE_UPDATER_ENABLED,
+            InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246577358


##########
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##########
@@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
             });
         }
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+        for (final KafkaStreams stream: kafkaStreamsList) {
+            stream.start();
+        }

Review Comment:
   With the state updater enabled, the Streams clients might still be in `REBALANCING` when the task that throws the `IllegalStateExeption` is processing. Tasks are processed and restored in parallel, but the stream thread transitions to RUNNING when all tasks are restored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260139834


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
             }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
             // Kill instance, delete state to force restoration.
-            assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60)));
+            assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60)));
             IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
             Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
                 .map(Path::toFile)
                 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted"));
+        } finally {
+            streams.get().close();

Review Comment:
   Not sure why we need `AtomicReference`? Can you elaborate?
   
   Why does this not work?
   ```
   KafkaStream streams;
   try {
     stream = new KafkaStreams(...);
     ...
   ```
   
   Btw: `streams.get()` could return `null` in case we fail before calling `set(...)`. Need a `null`-check here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246717245


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
-        final boolean stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
+        final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+                config.originals(),
+                InternalConfig.STATE_UPDATER_ENABLED,
+                InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   Good idea!
   Done!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1254072786


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws Exception {
         t1.toStream().to(outputTopicName);
         final KafkaStreams streams = new KafkaStreams(builder.build(), props);

Review Comment:
   Actually the Streams clients are closed in the `finally`-clause. The thing that was missing was that the Streams clients were not created in the `try`-clause. I needed to use `AtomicReference` for that, otherwise I could not use the Streams clients in lambdas.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1294315120


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   For my understanding: why can there only be one delete call in-flight at the same time? Where is this prevented?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] wcarlson5 commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "wcarlson5 (via GitHub)" <gi...@apache.org>.
wcarlson5 commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260958637


##########
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##########
@@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
             });
         }
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+        for (final KafkaStreams stream: kafkaStreamsList) {
+            stream.start();
+        }

Review Comment:
   Okay that is very interesting. Good find then!



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   Okay that works for me, thanks for the explanation I missed that they were equivalent 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1258012878


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   @wcarlson5 Could you elaborate what is worrying you here?
   
   I ran the tests with disabled state updater and the disabled cache and all tests pass. That tells me that the state updater does not change any results.
   Additionally, the results with enabled and disabled cache are semantically equivalent.
   ```
   pair("B", 1L), pair("A", 2L), pair("C", 2L)
   ```
   is equivalent to
   ```
   pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)
   ```
   The latter just produces more intermediate results.
   
   In general, if a test verifies the correctness of results, disabling the cache makes the test more robust, because the results do not depend on commit time intervals or production rates.
   
   Let me know if I missed something here.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1258014970


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1619,7 +1639,14 @@ private List<Task> standbyTaskIterable() {
     }
 
     private Stream<Task> standbyTaskStream() {
-        return tasks.allTasks().stream().filter(t -> !t.isActive());
+        if (stateUpdater != null) {
+            return Stream.concat(
+                stateUpdater.getStandbyTasks().stream(),
+                tasks.allTasks().stream().filter(t -> !t.isActive())
+            );
+        } else {
+            return tasks.allTasks().stream().filter(t -> !t.isActive());

Review Comment:
   Done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen with different contents of the cache than without the state updater.
   If it is not needed, I think turning off caching makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246640445


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -87,7 +88,7 @@ public static void closeCluster() {
     }
 
 
-    private final Time time = CLUSTER.time;
+    private final Time time = new MockTime(1);

Review Comment:
   The time needs to progress when the state updater is shutdown, otherwise the shutdown is blocked. `CLUSTER` provides a mock time that does not progress. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246587948


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching

Review Comment:
   @wcarlson5 @ableegoldman Could you review this changes to know if I changed something I should not have done?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246699178


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1123,7 +1138,10 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
     public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
-        final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
+        final Properties props = configProps(true);
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1257916618


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {
+        return activeRunningTaskStream().collect(Collectors.toList());
+    }
+
     private Stream<Task> activeTaskStream() {
+        if (stateUpdater != null) {
+            return Stream.concat(
+                activeRunningTaskStream(),
+                stateUpdater.getTasks().stream().filter(Task::isActive)
+            );
+        }

Review Comment:
   Yes, the state updater has active tasks. An active task that needs to be restored is given to the state updater which restores its state. The state updater owns active tasks in state `RESTORING` and standby tasks in state `RUNNING`.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1257992420


##########
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##########
@@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
             });
         }
 
-        startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+        for (final KafkaStreams stream: kafkaStreamsList) {
+            stream.start();
+        }

Review Comment:
   I am not sure I understand what you are asking, so bear with me if my reply is not the one you expected. 
   
   Adding
   ```
   waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.RUNNING, ofSeconds(60))
   ```
   after starting all Streams clients makes the test flaky because with the state updater NOT all clients need to be in `RUNNING` before a Streams client transits to `ERROR`. Not even the Streams client that throws the exception needs to be in `RUNNING` before transitioning to `ERROR`. 
   Actually, now that I looked more at this code, I am wondering why this test was not flaky before my changes. Each Streams client is started independently. That means, there is no guarantee, that all three Streams clients are `RUNNING` at the same time. I did some testing with `Thread.sleep()` after each client were started to simulate delays in processing in the version of the test before my changes and the test was flaky indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246569864


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1613,7 +1640,14 @@ private List<Task> standbyTaskIterable() {
     }
 
     private Stream<Task> standbyTaskStream() {
-        return tasks.allTasks().stream().filter(t -> !t.isActive());
+        if (stateUpdater != null) {
+            return Stream.concat(
+                stateUpdater.getStandbyTasks().stream(),
+                tasks.allTasks().stream().filter(t -> !t.isActive())
+            );
+        } else {

Review Comment:
   With enabled state updater, the state updater owns standby tasks. However, when the state updater is shut down it moves its stand by tasks to the stream thread for closing out. Thus, we also need to add `tasks.allTasks().stream().filter(t -> !t.isActive())` here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246559253


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {
+        return activeRunningTaskStream().collect(Collectors.toList());
+    }
+
     private Stream<Task> activeTaskStream() {
+        if (stateUpdater != null) {
+            return Stream.concat(
+                activeRunningTaskStream(),
+                stateUpdater.getTasks().stream().filter(Task::isActive)
+            );
+        }

Review Comment:
   With the state updater enabled, restoring tasks are owned by the state updater. So they need to be retrieved from there and merged with the running active tasks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246557140


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1542,12 +1551,20 @@ Set<TaskId> activeTaskIds() {
             .collect(Collectors.toSet());
     }
 
-    Set<TaskId> standbyTaskIds() {
-        return standbyTaskStream()
+    Set<TaskId> activeRunningTaskIds() {
+        return activeRunningTaskStream()
             .map(Task::id)
             .collect(Collectors.toSet());
     }
 
+    Set<TaskId> standbyTaskIds() {
+        if (stateUpdater != null) {
+            return stateUpdater.getStandbyTasks().stream().map(Task::id).collect(Collectors.toSet());
+        } else {
+            return standbyTaskStream().map(Task::id).collect(Collectors.toSet());
+        }
+    }

Review Comment:
   With the state updater enabled, standby tasks are owned by the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246572219


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1881,15 +1915,11 @@ public static void executeAndMaybeSwallow(final boolean clean,
     }
 
     boolean needsInitializationOrRestoration() {
-        return activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
+        return activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
     }
 
     // for testing only
     void addTask(final Task task) {
         tasks.addTask(task);
     }
-
-    TasksRegistry tasks() {
-        return tasks;
-    }

Review Comment:
   Never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246620424


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {

Review Comment:
   You are right, there is also `allOwnedTasks()`. All owned tasks are the tasks owned by the stream thread. i.e., all tasks with state updater disabled and only the tasks owned by the stream thread and not by the state updater with state updater enabled. So it is a bit confusing, but luckily only a temporary confusion. A ToDo above `allOwnedTasks()` states:
   
   ```java
        * TODO: after we complete switching to state updater, we could rename this function as allRunningTasks
        *       to be differentiated from allTasks including running and restoring tasks
   ```  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lucasbru commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "lucasbru (via GitHub)" <gi...@apache.org>.
lucasbru commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246586612


##########
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##########
@@ -113,6 +113,7 @@ public void before(final TestInfo testInfo) {
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100L);
+        streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   doesn't the integration test work with state updater? A comment would be good here at least



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -208,6 +208,8 @@ private Properties configProps(final String appId, final String host) {
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
+        streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+        streamsConfiguration.put(StreamsConfig.InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   Same here - why do we disable the state updater here?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1123,7 +1138,10 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
     public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException {
         internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1);
 
-        final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true);
+        final Properties props = configProps(true);
+        props.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   same here. if possible, a comment why this test is invalid for state updater



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3038,6 +3149,7 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
     @Test
     public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
         final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig();
+        streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   In the corresponding SU test, can we remove the `streamsConfigProps.put`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##########
@@ -201,7 +201,7 @@ public boolean commitNeeded() {
 
     @Override
     public StateStore getStore(final String name) {
-        throw new UnsupportedOperationException("This task is read-only");
+        return task.getStore(name);

Review Comment:
   Would be nice to expose a read only state store here (hiding init, flush, close and the like), but that's probably for a differenet PR



##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching
+        asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L));

Review Comment:
   why did the output data change?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1600,7 +1617,21 @@ List<Task> activeTaskIterable() {
         return activeTaskStream().collect(Collectors.toList());
     }
 
+    List<Task> activeRunningTaskIterable() {

Review Comment:
   What does "Running" mean? Is this existing terminology or are you introducing new terms here? What is the difference to "Processing" tasks, and didn't we also have "Owned" tasks somewhere?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata,
 
         final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics);
 
-        final boolean stateUpdaterEnabled =
-            InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false);
+        final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+                config.originals(),
+                InternalConfig.STATE_UPDATER_ENABLED,
+                InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+        );

Review Comment:
   could make sense to make a little `InternalConfig.getStateUpdateEnabled(config)`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246621516


##########
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##########
@@ -173,9 +173,9 @@ public static void closeCluster() {
     private static final List<KeyValue<String, Long>> STANDARD_INPUT_DATA =
         asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L));
     private static final List<KeyValue<String, Long>> COUNT_OUTPUT_DATA =
-        asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching
+        asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L));

Review Comment:
   See https://github.com/apache/kafka/pull/13927/files#r1246594593



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #13927: [DO NOT MERGE] KAFKA-10199: Enable state updater by default

Posted by "cadonna (via GitHub)" <gi...@apache.org>.
cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1296232439


##########
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##########
@@ -212,10 +213,11 @@ public void shouldRestoreState() throws Exception {
         );
 
         // we need long enough timeout to by-pass the log manager's InitialTaskDelayMs, which is hard-coded on server side
+        final long waitForPurgeMs = 60000;
         TestUtils.waitForCondition(
-            new RepartitionTopicVerified(currentSize -> currentSize <= PURGE_SEGMENT_BYTES),
-            60000,
-            "Repartition topic " + REPARTITION_TOPIC + " not purged data after 60000 ms."
+            new RepartitionTopicVerified(currentSize -> currentSize <= 4L * PURGE_SEGMENT_BYTES),

Review Comment:
   I opened the following PR: https://github.com/apache/kafka/pull/14227
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org