You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ni...@apache.org on 2017/08/09 17:50:53 UTC

[07/23] samza git commit: SAMZA-1368; make sure new job model will be generated in case of barrier timeout.

SAMZA-1368; make sure new job model will be generated in case of barrier timeout.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Shanthoosh V <sv...@linkedin.com>

Closes #247 from sborya/onBarrierTimeout1


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c113939
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c113939
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c113939

Branch: refs/heads/0.14.0
Commit: 1c1139399599c1cb31249e8d7a28291e2ad9d27e
Parents: 4eb5153
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri Jul 21 15:32:58 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Fri Jul 21 15:32:58 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/samza/zk/ZkJobCoordinator.java  | 16 +++++++++++-----
 1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/1c113939/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index dd08e3f..e973099 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -313,11 +313,17 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version));
       } else {
         if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
-          // no-op
-          // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
-          // participants failed to join. That means, they should have de-registered from "processors" list
-          // and that would have triggered onProcessorChange action -> a new round of consensus.
-          LOG.info("Barrier for version " + version + " timed out.");
+          // no-op for non-leaders
+          // for leader: make sure we do not stop - so generate a new job model
+          LOG.warn("Barrier for version " + version + " timed out.");
+          if (zkController.isLeader()) {
+            LOG.info("Leader will schedule a new job model generation");
+            debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+              {
+                // actual actions to do are the same as onProcessorChange
+                doOnProcessorChange(new ArrayList<>());
+              });
+          }
         }
       }
     }