You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/11/01 00:22:37 UTC

[GitHub] [samza] ajothomas opened a new pull request, #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

ajothomas opened a new pull request, #1637:
URL: https://github.com/apache/samza/pull/1637

   # Improvement:
   
   This PR is a part of the Pipeline Drain work and aims to provide a way to specify a task callback timeout to be used for pipeline drain. The standard `task.callback.timeout.ms` parameter might be too small for the drain operation to complete. Drain operation will involve clearing of intermediate state which might require a larger timeout for the task callback. We need a way to provide a configurable timeout to be used during drain operation.
   
   # Changes:
   - Add a new config `task.callback.drain.timeout.ms` in `TaskConfig`
   - Added logic to override `TaskCallbackManager`'s timeout parameter in `TaskWorker`s on drain.
   
   # Tests:
   
   - Unit test changes for `TaskConfig`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1018501997


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
           callback.failure(new SamzaException(msg));
         }
       };
-      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+
+      final ScheduledFuture scheduledFuture = isDraining && (envelope.isDrain() || envelope.isWatermark())

Review Comment:
   Thanks, refactored the `TaskCallbackManager`'s `createCallback` method to incorporate timeout and moved the logic to decide what timeout to use to the `process()` method inside `AsyncTaskWorker`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
mynameborat commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1010012333


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -76,7 +76,7 @@ List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
   private final TaskCallbacks completedCallbacks = new TaskCallbacks();
   private final ScheduledExecutorService timer;
   private final TaskCallbackListener listener;
-  private final long timeout;
+  private long timeout;

Review Comment:
   In general, its anti pattern to override instance variables post construction. We should probably find another way to enforce timeout for drain.



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
   // timeout for triggering a callback
   public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
   static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  // timeout for triggering a callback during drain
+  public static final String DRAIN_CALLBACK_TIMEOUT_MS = "task.callback.drain.timeout.ms";
+
+  // default timeout for triggering a callback during drain
+  static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();

Review Comment:
   The default seems too high for applications to get notified in case of faults. What is the rationale behind choosing such a high value? Is there a systematic way to compute the value here? 
   
   Like what are the downstream dependencies of this function and what SLAs to account for when computing the value for this. Document all of these as part of the configuration documentation as well.



##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -127,4 +127,14 @@ public List<TaskCallbackImpl> updateCallback(TaskCallbackImpl callback) {
       return ImmutableList.of(callback);
     }
   }
+
+  /**
+   * Override the timeout set in the callback manager with the given new timeout.
+   * This is intended to be used with pipeline drain as we want to override the existing timeout with a higher timeout.
+   *
+   * @param timeout new timeout for process callbacks
+   * */
+  public void updateTaskCallbackTimeout(long timeout) {

Review Comment:
   Why are we overriding the existing timeout set as part of the construction of the class? This is confusing and breaks the timeout originally configured that gets passed to the callback manager (which is `task.callback.timeout.ms`).
   
   With this change, we are changing user behavior with setting the default message processing timeout latency to 30 minutes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1018500682


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -127,4 +127,14 @@ public List<TaskCallbackImpl> updateCallback(TaskCallbackImpl callback) {
       return ImmutableList.of(callback);
     }
   }
+
+  /**
+   * Override the timeout set in the callback manager with the given new timeout.
+   * This is intended to be used with pipeline drain as we want to override the existing timeout with a higher timeout.
+   *
+   * @param timeout new timeout for process callbacks
+   * */
+  public void updateTaskCallbackTimeout(long timeout) {

Review Comment:
   Changed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1012008633


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -76,7 +76,7 @@ List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
   private final TaskCallbacks completedCallbacks = new TaskCallbacks();
   private final ScheduledExecutorService timer;
   private final TaskCallbackListener listener;
-  private final long timeout;
+  private long timeout;

Review Comment:
   Fixed this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
mynameborat commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1013403151


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
           callback.failure(new SamzaException(msg));
         }
       };
-      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+
+      final ScheduledFuture scheduledFuture = isDraining && (envelope.isDrain() || envelope.isWatermark())
+          ? timer.schedule(timerTask, drainCallbackTimeout, TimeUnit.MILLISECONDS)
+          : timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);

Review Comment:
   Can we add unit tests to the addition to ensure we use the drainTimeout vs timeout appropriately?



##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
           callback.failure(new SamzaException(msg));
         }
       };
-      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+
+      final ScheduledFuture scheduledFuture = isDraining && (envelope.isDrain() || envelope.isWatermark())

Review Comment:
   why do we need both `isDraining` and `envelope.isDrain()`? What happens if the runloop has not propagated its intent but the envelope is draining? Will it be treated as regular message processing?
   
   



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
   // timeout for triggering a callback
   public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
   static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  // timeout for triggering a callback during drain
+  public static final String DRAIN_CALLBACK_TIMEOUT_MS = "task.callback.drain.timeout.ms";
+
+  // default timeout for triggering a callback during drain
+  static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;

Review Comment:
   What does -1L default denote? Wait forever in case of drain message flow stuck is it? 



##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
           callback.failure(new SamzaException(msg));
         }
       };
-      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+
+      final ScheduledFuture scheduledFuture = isDraining && (envelope.isDrain() || envelope.isWatermark())

Review Comment:
   Might be easier to drive this in one place as opposed to split across?
   e.g., Would adding a new method w/ `createCallback(..., timeout)` fit better here? That way you can keep all the logic of drain and watermark and where we are abstracted from this piece of code (ideally thats how it should be) and then have the caller determine the timeout based on its need.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1018502824


##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
   // timeout for triggering a callback
   public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
   static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  // timeout for triggering a callback during drain
+  public static final String DRAIN_CALLBACK_TIMEOUT_MS = "task.callback.drain.timeout.ms";
+
+  // default timeout for triggering a callback during drain
+  static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;

Review Comment:
   The defaults for the process callback timeout and drain callback timeout are both -1 here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] ajothomas commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
ajothomas commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1018537087


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -106,7 +110,10 @@ public void run() {
           callback.failure(new SamzaException(msg));
         }
       };
-      ScheduledFuture scheduledFuture = timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);
+
+      final ScheduledFuture scheduledFuture = isDraining && (envelope.isDrain() || envelope.isWatermark())
+          ? timer.schedule(timerTask, drainCallbackTimeout, TimeUnit.MILLISECONDS)
+          : timer.schedule(timerTask, timeout, TimeUnit.MILLISECONDS);

Review Comment:
   Added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [samza] mynameborat merged pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

Posted by GitBox <gi...@apache.org>.
mynameborat merged PR #1637:
URL: https://github.com/apache/samza/pull/1637


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org