You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:20:01 UTC

[20/26] samza git commit: SAMZA-1404: Add warning in case of potential process starvation due to longer window method

SAMZA-1404: Add warning in case of potential process starvation due to longer window method

Currently, we use the average windowNs as the lower bound for window trigger time to determine if user needs to warned. We could potentially make this complicated by also including average commit ns and some delta to be more accurate.

Author: Bharath Kumarasubramanian <bk...@linkedin.com>

Reviewers: Jagadish V <vj...@gmail.com>

Closes #281 from bharathkk/master


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3636be03
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3636be03
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3636be03

Branch: refs/heads/0.14.0
Commit: 3636be0316ba1ba53f961dacc5e865798119ab20
Parents: 3a13438
Author: Bharath Kumarasubramanian <bk...@linkedin.com>
Authored: Mon Aug 28 12:27:47 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Mon Aug 28 12:27:47 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/samza/task/AsyncRunLoop.java | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3636be03/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
index e5c40df..478f109 100644
--- a/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
+++ b/samza-core/src/main/java/org/apache/samza/task/AsyncRunLoop.java
@@ -443,6 +443,20 @@ public class AsyncRunLoop implements Runnable, Throttleable {
             long startTime = clock.nanoTime();
             task.window(coordinator);
             containerMetrics.windowNs().update(clock.nanoTime() - startTime);
+
+            // A window() that executes for more than task.window.ms, will starve the next process() call
+            // when the application has job.thread.pool.size > 1. This is due to prioritizing window() ahead of process()
+            // to guarantee window() will fire close to its trigger interval time.
+            // We warn the users if the average window execution time is greater than equals to window trigger interval.
+            long lowerBoundForWindowTriggerTimeInMs = TimeUnit.NANOSECONDS
+                .toMillis((long) containerMetrics.windowNs().getSnapshot().getAverage());
+            if (windowMs <= lowerBoundForWindowTriggerTimeInMs) {
+              log.warn(
+                  "window() call might potentially starve process calls."
+                      + " Consider setting task.window.ms > {} ms",
+                  lowerBoundForWindowTriggerTimeInMs);
+            }
+
             coordinatorRequests.update(coordinator);
 
             state.doneWindow();