You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/07/21 22:33:01 UTC
samza git commit: SAMZA-1368;
make sure new job model will be generated in case of barrier timeout.
Repository: samza
Updated Branches:
refs/heads/master 4eb515313 -> 1c1139399
SAMZA-1368; make sure new job model will be generated in case of barrier timeout.
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Shanthoosh V <sv...@linkedin.com>
Closes #247 from sborya/onBarrierTimeout1
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/1c113939
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/1c113939
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/1c113939
Branch: refs/heads/master
Commit: 1c1139399599c1cb31249e8d7a28291e2ad9d27e
Parents: 4eb5153
Author: Boris Shkolnik <bo...@apache.org>
Authored: Fri Jul 21 15:32:58 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Fri Jul 21 15:32:58 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/samza/zk/ZkJobCoordinator.java | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/1c113939/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index dd08e3f..e973099 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -313,11 +313,17 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
debounceTimer.scheduleAfterDebounceTime(barrierAction, 0, () -> onNewJobModelConfirmed(version));
} else {
if (ZkBarrierForVersionUpgrade.State.TIMED_OUT.equals(state)) {
- // no-op
- // In our consensus model, if the Barrier is timed-out, then it means that one or more initial
- // participants failed to join. That means, they should have de-registered from "processors" list
- // and that would have triggered onProcessorChange action -> a new round of consensus.
- LOG.info("Barrier for version " + version + " timed out.");
+ // no-op for non-leaders
+ // for leader: make sure we do not stop - so generate a new job model
+ LOG.warn("Barrier for version " + version + " timed out.");
+ if (zkController.isLeader()) {
+ LOG.info("Leader will schedule a new job model generation");
+ debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE, debounceTimeMs, () ->
+ {
+ // actual actions to do are the same as onProcessorChange
+ doOnProcessorChange(new ArrayList<>());
+ });
+ }
}
}
}