You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2022/11/01 02:15:16 UTC

[GitHub] [samza] mynameborat commented on a diff in pull request #1637: SAMZA-2765: [Pipeline Drain] Adding config for task callback timeout during drain

mynameborat commented on code in PR #1637:
URL: https://github.com/apache/samza/pull/1637#discussion_r1010012333


##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -76,7 +76,7 @@ List<TaskCallbackImpl> update(TaskCallbackImpl cb) {
   private final TaskCallbacks completedCallbacks = new TaskCallbacks();
   private final ScheduledExecutorService timer;
   private final TaskCallbackListener listener;
-  private final long timeout;
+  private long timeout;

Review Comment:
   In general, its anti pattern to override instance variables post construction. We should probably find another way to enforce timeout for drain.



##########
samza-core/src/main/java/org/apache/samza/config/TaskConfig.java:
##########
@@ -90,6 +90,13 @@ public class TaskConfig extends MapConfig {
   // timeout for triggering a callback
   public static final String CALLBACK_TIMEOUT_MS = "task.callback.timeout.ms";
   static final long DEFAULT_CALLBACK_TIMEOUT_MS = -1L;
+
+  // timeout for triggering a callback during drain
+  public static final String DRAIN_CALLBACK_TIMEOUT_MS = "task.callback.drain.timeout.ms";
+
+  // default timeout for triggering a callback during drain
+  static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = Duration.ofMinutes(30).toMillis();

Review Comment:
   The default seems too high for applications to get notified in case of faults. What is the rationale behind choosing such a high value? Is there a systematic way to compute the value here? 
   
   Like what are the downstream dependencies of this function and what SLAs to account for when computing the value for this. Document all of these as part of the configuration documentation as well.



##########
samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java:
##########
@@ -127,4 +127,14 @@ public List<TaskCallbackImpl> updateCallback(TaskCallbackImpl callback) {
       return ImmutableList.of(callback);
     }
   }
+
+  /**
+   * Override the timeout set in the callback manager with the given new timeout.
+   * This is intended to be used with pipeline drain as we want to override the existing timeout with a higher timeout.
+   *
+   * @param timeout new timeout for process callbacks
+   * */
+  public void updateTaskCallbackTimeout(long timeout) {

Review Comment:
   Why are we overriding the existing timeout set as part of the construction of the class? This is confusing and breaks the timeout originally configured that gets passed to the callback manager (which is `task.callback.timeout.ms`).
   
   With this change, we are changing user behavior with setting the default message processing timeout latency to 30 minutes.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@samza.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org