You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/21 15:52:20 UTC

[GitHub] [kafka] cadonna opened a new pull request, #12427: KAFKA-10199: Add tasks to state updater when they are created

cadonna opened a new pull request, #12427:
URL: https://github.com/apache/kafka/pull/12427

   This PR introduces an internal config to enable the state updater.
   
   If the state updater is enabled newly created tasks are added to
   the state updater.
   
   The integration of the state updater starts with this PR and is not
   finished.
   
   Additionally, this PR introduces a builder for mocks for tasks.
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r928855148


##########
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##########
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
         return Arrays.stream(Thread.currentThread().getStackTrace())
                 .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
     }
+
+    public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   BTW, you do not want to use a mock task in `StreamTaskTest` since you are testing a task there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r927038139


##########
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##########
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
         return Arrays.stream(Thread.currentThread().getStackTrace())
                 .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
     }
+
+    public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   nit: we have other unit test classes duplicating this logic, e.g. in the `DefaultStateUpdaterTest` above, also in `StreamTaskTest#createStatelessTask`. Could we consolidate them all in this class?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map<TaskId, Set<TopicPartition>> activeTask
 
         if (!activeTasksToCreate.isEmpty()) {
             for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-                activeTasksPerId.put(activeTask.id(), activeTask);
-                pendingActiveTasks.remove(activeTask.id());
-                for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
-                    activeTasksPerPartition.put(topicPartition, activeTask);
+                if (stateUpdater != null) {

Review Comment:
   This is a meta thought: I think we should consider extracting the creation of tasks out of `Tasks` and into the `TaskManager`, and hence also not include `StateUpdater` into `Tasks`. Maybe we can do that later in a follow-up refactoring if you agree.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang merged pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
guozhangwang merged PR #12427:
URL: https://github.com/apache/kafka/pull/12427


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r926852811


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdaterTest.java:
##########
@@ -129,7 +128,7 @@ public void shouldThrowIfStatelessTaskNotInStateRestoring() {
 
     @Test
     public void shouldThrowIfStatefulTaskNotInStateRestoring() {
-        shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, Collections.singletonList(TOPIC_PARTITION_A_0)));
+        shouldThrowIfActiveTaskNotInStateRestoring(createActiveStatefulTask(TASK_0_0, mkSet(TOPIC_PARTITION_A_0)));

Review Comment:
   All changes here are just replacments of collections with sets.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] cadonna commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
cadonna commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r928724038


##########
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##########
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
         return Arrays.stream(Thread.currentThread().getStackTrace())
                 .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
     }
+
+    public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   That was the idea. I just did not want to pollute too much this PR.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map<TaskId, Set<TopicPartition>> activeTask
 
         if (!activeTasksToCreate.isEmpty()) {
             for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-                activeTasksPerId.put(activeTask.id(), activeTask);
-                pendingActiveTasks.remove(activeTask.id());
-                for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
-                    activeTasksPerPartition.put(topicPartition, activeTask);
+                if (stateUpdater != null) {

Review Comment:
   I agree!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] guozhangwang commented on a diff in pull request #12427: KAFKA-10199: Add tasks to state updater when they are created

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on code in PR #12427:
URL: https://github.com/apache/kafka/pull/12427#discussion_r929113602


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/Tasks.java:
##########
@@ -136,10 +139,14 @@ private void createActiveTasks(final Map<TaskId, Set<TopicPartition>> activeTask
 
         if (!activeTasksToCreate.isEmpty()) {
             for (final Task activeTask : activeTaskCreator.createTasks(mainConsumer, activeTasksToCreate)) {
-                activeTasksPerId.put(activeTask.id(), activeTask);
-                pendingActiveTasks.remove(activeTask.id());
-                for (final TopicPartition topicPartition : activeTask.inputPartitions()) {
-                    activeTasksPerPartition.put(topicPartition, activeTask);
+                if (stateUpdater != null) {

Review Comment:
   Sounds good. I will prepare a PR for this :) 



##########
streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:
##########
@@ -273,4 +281,70 @@ public static boolean isCheckSupplierCall() {
         return Arrays.stream(Thread.currentThread().getStackTrace())
                 .anyMatch(caller -> "org.apache.kafka.streams.internals.ApiUtils".equals(caller.getClassName()) && "checkSupplier".equals(caller.getMethodName()));
     }
+
+    public static StreamTask createStatefulTask(final TaskId taskId,

Review Comment:
   Ack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org