You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/11/15 16:03:34 UTC

[GitHub] [kafka] cadonna opened a new pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

cadonna opened a new pull request #11499:
URL: https://github.com/apache/kafka/pull/11499


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749480160



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * The state of the task will be updated.
+     *
+     * @param task task
+     */
+    void add(final Task task);
+
+    /**
+     * Removes a task (active and standby) from the state updater.
+     *
+     * A task is removed from the state updater irrespective of whether its state is up-to-date or not.
+     *
+     * @param task tasks to remove
+     */
+    void remove(final Task task);
+
+    /**
+     * Gets restored active tasks from state restoration/update
+     *
+     * @param timeout duration how long the calling thread should wait for restored active tasks
+     *
+     * @return list of active tasks with up-to-date states
+     */
+    List<StreamTask> getRestoredActiveTasks(final Duration timeout);
+
+    /**
+     * Gets a list of tasks that failed during restoration.
+     *
+     * The exception that caused the failure can be retrieved by {@link Task#getException()}
+     *
+     * @return failed tasks
+     */
+    List<Task> getFailedTasks();

Review comment:
       If a task fails during restoration it is returned by this method. The exception that caused the failure can be retrieved from the task itself with `Task#getException()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang merged pull request #11499: KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang merged pull request #11499:
URL: https://github.com/apache/kafka/pull/11499


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749486443



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {

Review comment:
       An implementation of the state updater is passed to the task manager in its constructor. Since the threading model of the state updater is encapsulated and one can pass the same state updater to multiple task managers there is no fixed 1:1 relationship between a restoration thread and a stream thread.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749478922



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * The state of the task will be updated.
+     *
+     * @param task task
+     */
+    void add(final Task task);
+
+    /**
+     * Removes a task (active and standby) from the state updater.
+     *
+     * A task is removed from the state updater irrespective of whether its state is up-to-date or not.
+     *
+     * @param task tasks to remove
+     */
+    void remove(final Task task);
+
+    /**
+     * Gets restored active tasks from state restoration/update
+     *
+     * @param timeout duration how long the calling thread should wait for restored active tasks
+     *
+     * @return list of active tasks with up-to-date states
+     */
+    List<StreamTask> getRestoredActiveTasks(final Duration timeout);

Review comment:
       When the restoration of an active task is completed, the task is returned call to this method. The calling thread can decide how long to wait for the next restored active tasks. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749468726



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * The state of the task will be updated.
+     *
+     * @param task task
+     */
+    void add(final Task task);

Review comment:
       Each time an active or standby task transits to state `RESTORING` it should be added to the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749476568



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * The state of the task will be updated.
+     *
+     * @param task task
+     */
+    void add(final Task task);
+
+    /**
+     * Removes a task (active and standby) from the state updater.
+     *
+     * A task is removed from the state updater irrespective of whether its state is up-to-date or not.
+     *
+     * @param task tasks to remove
+     */
+    void remove(final Task task);

Review comment:
       Each time a task is suspended in state `RESTORING`, we need to remove it from the state updater.  

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateUpdater.java
##########
@@ -0,0 +1,50 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.time.Duration;
+import java.util.List;
+
+public interface StateUpdater {
+
+    /**
+     * Adds a task (active or standby) to the state updater.
+     *
+     * The state of the task will be updated.
+     *
+     * @param task task
+     */
+    void add(final Task task);

Review comment:
       Each time an active or standby task transits to state `RESTORING` it needs to be added to the state updater.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #11499: KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#issuecomment-1049069755


   Thanks @cadonna , merged to trunk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #11499: [WIP] KAFKA-10199: Add interface for state updater

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #11499:
URL: https://github.com/apache/kafka/pull/11499#discussion_r749758167



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
##########
@@ -246,4 +246,11 @@ default boolean commitRequested() {
      * @return This returns the time the task started idling. If it is not idling it returns empty.
      */
     Optional<Long> timeCurrentIdlingStarted();
+
+    /**
+     * Gets the exception that caused the failure of the task.
+     *
+     * @return exception that caused the failure of the task
+     */
+    Optional<RuntimeException> getException();

Review comment:
       I think we can narrow down the scope of this exception, if it is only going to be used during restoration time. But nevertheless we can discuss about this later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org