You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/09 01:16:59 UTC
samza git commit: SAMZA-1124; Job coordinator with time out
Repository: samza
Updated Branches:
refs/heads/master 6f811de30 -> a974b236a
SAMZA-1124; Job coordinator with time out
If a processor doesn't join the barrier for the TimeOut time - the barrier is cancelled. All the processor should unsubscribe from it.
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>
Reviewers: Xinyu Liu <xi...@linkedin.com>
Closes #77 from sborya/JobCoordinatorWithTO and squashes the following commits:
ed055dd [Boris Shkolnik] checkstyle
1468902 [Boris Shkolnik] renambed a method
579d2e7 [Boris Shkolnik] Merge branch 'JobCoordinator' into JobCoordinatorWithTO
54ab688 [Boris Shkolnik] removed JavaJobConfig.java
3f95d46 [Boris Shkolnik] checkstyle
75f9a94 [Boris Shkolnik] added test for time out
b73ba32 [Boris Shkolnik] merge + test
5d67be0 [Boris Shkolnik] removed extra empty lines
9cf3c3e [Boris Shkolnik] addressed review comments
c47031d [Boris Shkolnik] added timeout for ZkBarrierForVersionUpgrade
93a55a9 [Boris Shkolnik] use write directly in the barrier when all joined
f89a037 [Boris Shkolnik] addressed some notes
4219720 [Boris Shkolnik] added JavaJobConfig
659ae7c [Boris Shkolnik] add ZkJobCoordinatorFactory
520a083 [Boris Shkolnik] added ZKJobCoordinatorFactory
a42218e [Boris Shkolnik] checkstyle
02ca658 [Boris Shkolnik] typo
c1fd0b2 [Boris Shkolnik] merge cleanup
1c8eef4 [Boris Shkolnik] merge cleanup
a7e014a [Boris Shkolnik] merged
b4a0642 [Boris Shkolnik] typo
33bacdd [Boris Shkolnik] merge
d1582a9 [Boris Shkolnik] missed method name change
4c481a2 [Boris Shkolnik] merge
b811f3d [Boris Shkolnik] changed to private final
5d43419 [Boris Shkolnik] addressed review comments
1a0c54d [Boris Shkolnik] fixed test
e19d77b [Boris Shkolnik] renamed method
0c5edab [Boris Shkolnik] makey tryBecomeALeader async
b34f6b7 [Boris Shkolnik] added java doc
c2305b6 [Boris Shkolnik] cleanup
f83fc57 [Boris Shkolnik] merge
f8c8a6d [Boris Shkolnik] make a smaller PR for publish functionality only
251aad7 [Boris Shkolnik] removed unneeded interface for real
9892dee [Boris Shkolnik] removed unneeded interface
f20d15f [Boris Shkolnik] some updates to JobCoordinator
bd53c07 [Boris Shkolnik] deleteing already committed files
18198d1 [Boris Shkolnik] added test for zk barrier
9dba992 [Boris Shkolnik] moved the Test in to test subdir
c16d864 [Boris Shkolnik] added test
817a7b6 [Boris Shkolnik] merge complete
1e5947f [Boris Shkolnik] merged
6506b48 [Boris Shkolnik] merged with latest
4290b13 [Boris Shkolnik] merged
43eb076 [Boris Shkolnik] checkstyle errors
e0c44fe [Boris Shkolnik] merged
8e8d833 [Boris Shkolnik] merge
e59d38c [Boris Shkolnik] merge with ZkController
6a71cf6 [Boris Shkolnik] renamed the listners
6cbcf6e [Boris Shkolnik] review comments
efbee84 [Boris Shkolnik] Checkstyle errors
132300c [Boris Shkolnik] converted ZkLeaderElector.tryBecomeLeader to async method
13a05d7 [Boris Shkolnik] merge
2d59e0c [Boris Shkolnik] merge
82c819b [Boris Shkolnik] merge
7ebe9a6 [Boris Shkolnik] check style
41b2e46 [Boris Shkolnik] refactoring to match the new ZkUtils constructor
b07d63a [Boris Shkolnik] merge JavaJobConfig
bdc953b [Boris Shkolnik] merged
4301372 [Boris Shkolnik] added tests
4d48d83 [Boris Shkolnik] merge
c9bb475 [Boris Shkolnik] added tests for ZkUtils
592e9bb [Boris Shkolnik] added missing functionality for ZkControllerImpl into zkUtils and zkKeyBuilder
b473a6e [Boris Shkolnik] Added the new file ZkControllerListener.java
3412ed4 [Boris Shkolnik] Renamed ZkListener to ZkControllerListener
fabddc9 [Boris Shkolnik] merge
ad9108a [Boris Shkolnik] merge with ScheduleAfterDebounceTime
ba583d6 [Boris Shkolnik] merge
3d6b993 [Boris Shkolnik] cleaned up
fe69e70 [Boris Shkolnik] merge
7f8125b [Boris Shkolnik] Merge branch 'master' into ZkTestUtils
eaf04bb [Boris Shkolnik] added more comments
358ae6b [Boris Shkolnik] Added test and addressed review comments
9b22eb6 [Boris Shkolnik] JobModelPublish
0ef90b6 [Boris Shkolnik] Merge branch 'JobModel' into JobModelPublish
9c59048 [Boris Shkolnik] merge
017fe79 [Boris Shkolnik] added tests
5c8aa20 [Boris Shkolnik] JobModel Generation using SimpleGroupByContainerCount
2c841e1 [Boris Shkolnik] added awaitStart
eeb69ca [Boris Shkolnik] Merge branch 'ZkBarrier' into JobCoordinator
dc26bd2 [Boris Shkolnik] added BarrierForVersionUpgrade
cfdb4c7 [Boris Shkolnik] Merge branch 'ZkController' into ZkBarrier
b28ba14 [Boris Shkolnik] ZkBarrier
c8d26ba [Boris Shkolnik] merged
efc4d03 [Boris Shkolnik] ZkController
c9b3fe4 [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime' into ZkController
f0cae7b [Boris Shkolnik] Merge branch 'LeaderElector' into ZkController
3df0def [Boris Shkolnik] ZkControllerImpl
4801613 [Boris Shkolnik] cleanup
d32045b [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime1' into JobCoordinator
32f96b4 [Boris Shkolnik] added ScheduleAfterDebounce
ce83409 [Boris Shkolnik] cleanup
d7a7ccb [Boris Shkolnik] Merge branch 'LeaderElector' into ScheduleAfterDebounceTime
9c6b20a [Boris Shkolnik] added ZkListener
5f867c0 [Boris Shkolnik] added Apache license info
d0687b9 [Boris Shkolnik] Merge branch 'LeaderElector' into JobCoordinator
372829f [Boris Shkolnik] Merge branch 'master' into JobCoordinator
14db43d [Boris Shkolnik] added TestZkStreamProcessor - main manual test
b3b27c6 [Boris Shkolnik] Merge branch 'LeaderElector' into TestZkStreamProcessor
6649c80 [navina] Fixing ZkUtils close(). No need to close underlying connection explicitly
7f17e26 [Boris Shkolnik] ZkTestUtils
f904cd3 [Boris Shkolnik] ScheduleAfterDebounceTime
d126b10 [Boris Shkolnik] ScheduleAfterDebounceTime
63d8d60 [Boris Shkolnik] JavaJobConfig
ff15501 [Boris Shkolnik] JavaJobConfig
d20bacf [Boris Shkolnik] ZkTestUtils
737eb2f [Boris Shkolnik] ZkTestUtils
8e2d6c1 [Boris Shkolnik] add main manual test
7a47f84 [Boris Shkolnik] add main manual test
fa2186b [Boris Shkolnik] added ZkController
a0a7409 [Boris Shkolnik] Merge branch 'master' of https://github.com/navina/samza
edda60d [navina] Removing an unintended change to the grouper
6dd6b8d [navina] Adding tests for ZkLeaderElector
1734f8f [navina] Adding tests for ZkUtils
317cf16 [navina] Adding tests for ZkKeyBuilder
aaaf24e [navina] Adding EmbeddedZookeeper for testing
37c2c8b [navina] Extracting files related to LeaderElection
76b5167 [Boris Shkolnik] added new line at then end
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a974b236
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a974b236
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a974b236
Branch: refs/heads/master
Commit: a974b236a78826d993e50d8936a4cc393a97bfe4
Parents: 6f811de
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Authored: Wed Mar 8 17:16:51 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Mar 8 17:16:51 2017 -0800
----------------------------------------------------------------------
build.gradle | 1 +
gradle/dependency-versions.gradle | 1 +
.../samza/zk/BarrierForVersionUpgrade.java | 3 +-
.../samza/zk/ZkBarrierForVersionUpgrade.java | 114 ++++++----
.../org/apache/samza/zk/ZkJobCoordinator.java | 221 +++++++++++++++++++
.../samza/zk/ZkJobCoordinatorFactory.java | 57 +++++
.../main/java/org/apache/samza/zk/ZkUtils.java | 5 +-
.../samza/zk/TestScheduleAfterDebounceTime.java | 26 +--
.../zk/TestZkBarrierForVersionUpgrade.java | 53 ++++-
.../apache/samza/zk/TestZkLeaderElector.java | 1 +
.../java/org/apache/samza/zk/TestZkUtils.java | 9 +
11 files changed, 422 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 400a913..ea1e3c2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -160,6 +160,7 @@ project(":samza-core_$scalaVersion") {
compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
compile "com.101tec:zkclient:$zkClientVersion"
+ compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
testCompile project(":samza-api").sourceSets.test.output
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 0a8542b..20af1da 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -39,4 +39,5 @@
commonsCollectionVersion = "3.2.1"
httpClientVersion="4.4.1"
commonsLang3Version="3.4"
+ apacheCommonsCollections4Version="4.0"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
index b2d80d0..2b785f0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -32,12 +32,13 @@ public interface BarrierForVersionUpgrade {
* @param version - for which the barrier is started.
* @param processorsNames - list of processors available at the time of the JobModel generation.
*/
- void startBarrier(String version, List<String> processorsNames);
+ void start(String version, List<String> processorsNames);
/**
* Called by the processor.
* Updates the processor readiness to use the new version and wait on the barrier, until all other processors
* joined.
+ * The call is async. The callback will be invoked when the barrier is reached.
* @param version of the jobModel this barrier is protecting.
* @param processorsName as it appears in the list of processors.
* @param callback will be invoked, when barrier is reached.
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 3ec87b0..f7efa48 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -19,10 +19,12 @@
package org.apache.samza.zk;
+import java.util.Arrays;
import java.util.List;
-
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +33,8 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
private final ZkUtils zkUtils;
private final ZkKeyBuilder keyBuilder;
private final static String BARRIER_DONE = "done";
+ private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
+ private final static long BARRIER_TIMED_OUT_MS = 60 * 1000;
private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
private final ScheduleAfterDebounceTime debounceTimer;
@@ -45,36 +49,66 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
this.debounceTimer = debounceTimer;
}
+ /**
+ * set the barrier for the timer. If the timer is not achieved by the timeout - it will fail
+ * @param version for which the barrier is created
+ * @param timeout - time in ms to wait
+ */
+ public void setTimer(String version, long timeout) {
+ debounceTimer.scheduleAfterDebounceTime("VersionUpgradeTimeout", timeout, ()->timerOff(version));
+ }
+
+ protected long getBarrierTimeOutMs() {
+ return BARRIER_TIMED_OUT_MS;
+ }
+
+ private void timerOff(String version) {
+ // check if barrier has finished
+ final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ Stat stat = new Stat();
+ String done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat);
+ if (done != null && done.equals(BARRIER_DONE))
+ return; //nothing to do
+
+ while (true) {
+ try {
+ // write a new value if no one else did, if the value was changed since previous reading - retry
+ zkUtils.getZkClient().writeData(barrierDonePath, "TIMED_OUT", stat.getVersion());
+ return;
+ } catch (Exception e) {
+ // failed to write, try read/write again
+ LOG.info("Barrier timeout write failed");
+ done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat);
+ if (done.equals(BARRIER_DONE))
+ return; //nothing to do
+ }
+ }
+ }
+
@Override
- public void startBarrier(String version, List<String> processorsNames) {
- String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
- String barrierDonePath = String.format("%s/barrier_done", barrierPath);
- String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+ public void start(String version, List<String> processorsNames) {
+ final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ final String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
- // callback for when the barrier is reached
- Runnable callback = new Runnable() {
- @Override
- public void run() {
- LOG.info("Writing BARRIER DONE to " + barrierDonePath);
- zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
- }
- };
// subscribe for processor's list changes
LOG.info("Subscribing for child changes at " + barrierProcessors);
zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
- new ZkBarrierChangeHandler(callback, processorsNames));
+ new ZkBarrierChangeHandler(version, processorsNames));
+
+ setTimer(version, getBarrierTimeOutMs());
}
@Override
public void waitForBarrier(String version, String processorsName, Runnable callback) {
// if participant makes this call it means it has already stopped the old container and got the new job model.
- String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
- String barrierDonePath = String.format("%s/barrier_done", barrierPath);
- String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
- String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
-
+ final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ final String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+ final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
// update the barrier for this processor
LOG.info("Creating a child for barrier at " + barrierProcessorThis);
@@ -88,47 +122,32 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
* listener for the subscription.
*/
class ZkBarrierChangeHandler implements IZkChildListener {
- Runnable callback;
- List<String> names;
+ private final String version;
+ private final List<String> names;
- public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
- this.callback = callback;
+ public ZkBarrierChangeHandler(String version, List<String> names) {
+ this.version = version;
this.names = names;
}
@Override
public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
- // Find out the event & Log
- boolean allIn = true;
if (currentChildren == null) {
LOG.info("Got handleChildChange with null currentChildren");
return;
}
- // debug
- StringBuilder sb = new StringBuilder();
- for (String child : currentChildren) {
- sb.append(child).append(",");
- }
- LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString());
- sb = new StringBuilder();
- for (String child : names) {
- sb.append(child).append(",");
- }
- LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString());
+ LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray()));
+ LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray()));
// check if all the names are in
- for (String n : names) {
- if (!currentChildren.contains(n)) {
- LOG.info("node " + n + " is still not in the list ");
- allIn = false;
- break;
- }
- }
- if (allIn) {
+ if (CollectionUtils.containsAll(names, currentChildren)) {
LOG.info("ALl nodes reached the barrier");
- callback.run(); // all the names have registered
+ final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+ final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+ LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+ zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
}
}
}
@@ -152,6 +171,11 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
if (done.equals(BARRIER_DONE)) {
zkUtils.unsubscribeDataChanges(barrierPathDone, this);
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
+ } else if (done.equals(BARRIER_TIMED_OUT)) {
+ // timed out
+ LOG.error("Barrier for " + dataPath + " timed out");
+ System.out.println("Barrier for " + dataPath + " timed out");
+ zkUtils.unsubscribeDataChanges(barrierPathDone, this);
}
// we do not need to resubscribe because, ZkClient library does it for us.
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
new file mode 100644
index 0000000..1d16d4a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobCoordinator for stand alone processor managed via Zookeeper.
+ */
+public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
+ private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+
+ private final ZkUtils zkUtils;
+ private final int processorId;
+ private final ZkController zkController;
+ private final SamzaContainerController containerController;
+ private final BarrierForVersionUpgrade barrier;
+ private final ScheduleAfterDebounceTime debounceTimer;
+ private final StreamMetadataCache streamMetadataCache;
+ private final ZkKeyBuilder keyBuilder;
+ private final Config config;
+
+ private JobModel newJobModel;
+ private String newJobModelVersion; // version published in ZK (by the leader)
+ private JobModelManager jobModelManager;
+
+ public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
+ this.zkUtils = zkUtils;
+ this.keyBuilder = zkUtils.getKeyBuilder();
+ this.debounceTimer = debounceTimer;
+ this.processorId = processorId;
+ this.containerController = containerController;
+ this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
+ this.config = config;
+
+
+ barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer);
+ streamMetadataCache = getStreamMetadataCache();
+ }
+
+ private StreamMetadataCache getStreamMetadataCache() {
+ // model generation - NEEDS TO BE REVIEWED
+ JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+ Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+ for (String systemName: systemConfig.getSystemNames()) {
+ String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
+ if (systemFactoryClassName == null) {
+ String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
+ log.error(String.format(msg));
+ throw new SamzaException(msg);
+ }
+ SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
+ systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
+ }
+
+ return new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+ }
+
+ @Override
+ public void start() {
+ zkController.register();
+ }
+
+ public void cleanupZk() {
+ zkUtils.deleteRoot();
+ }
+
+ @Override
+ public void stop() {
+ zkController.stop();
+ }
+
+ @Override
+ public boolean awaitStart(long timeoutMs)
+ throws InterruptedException {
+ return containerController.awaitStart(timeoutMs);
+ }
+
+ @Override
+ public int getProcessorId() {
+ return processorId;
+ }
+
+ @Override
+ public JobModel getJobModel() {
+ return newJobModel;
+ }
+
+ //////////////////////////////////////////////// LEADER stuff ///////////////////////////
+ @Override
+ public void onBecomeLeader() {
+ log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+
+ List<String> emptyList = new ArrayList<>();
+
+ // actual actions to do are the same as onProcessorChange()
+ debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+ ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> onProcessorChange(emptyList));
+ }
+
+ @Override
+ public void onProcessorChange(List<String> processorIds) {
+ log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray()));
+ generateNewJobModel();
+ }
+
+ @Override
+ public void onNewJobModelAvailable(final String version) {
+ newJobModelVersion = version;
+ log.info("pid=" + processorId + "new JobModel available");
+ // stop current work
+ containerController.stopContainer();
+ log.info("pid=" + processorId + "new JobModel available.Container stopped.");
+ // get the new job model
+ newJobModel = zkUtils.getJobModel(version);
+ log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel);
+
+ String currentPath = zkUtils.getEphemeralPath();
+ String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
+
+ // update ZK and wait for all the processors to get this new version
+ barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
+ @Override
+ public void run() {
+ onNewJobModelConfirmed(version);
+ }
+ });
+ }
+
+ @Override
+ public void onNewJobModelConfirmed(String version) {
+ log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
+ // get the new Model
+ JobModel jobModel = getJobModel();
+ log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel);
+
+ // start the container with the new model
+ containerController.startContainer(jobModel.getContainers().get(processorId), jobModel.getConfig(),
+ jobModel.maxChangeLogStreamPartitions);
+ }
+
+ /**
+ * Generate new JobModel when becoming a leader or the list of processor changed.
+ */
+ private void generateNewJobModel() {
+ // get the current list of processors
+ List<String> currentProcessors = zkUtils.getSortedActiveProcessors();
+
+ // get the current version
+ String currentJMVersion = zkUtils.getJobModelVersion();
+ String nextJMVersion;
+ if (currentJMVersion == null) {
+ log.info("pid=" + processorId + "generating first version of the model");
+ nextJMVersion = "1";
+ } else {
+ nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
+ }
+ log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
+
+ StringBuilder sb = new StringBuilder();
+ List<Integer> containerIds = new ArrayList<>();
+ for (String processor : currentProcessors) {
+ String zkProcessorId = keyBuilder.parseIdFromPath(processor);
+ sb.append(zkProcessorId).append(",");
+ containerIds.add(Integer.valueOf(zkProcessorId));
+ }
+ log.info("generate new job model: processorsIds: " + sb.toString());
+
+ jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null,
+ containerIds);
+ JobModel jobModel = jobModelManager.jobModel();
+
+ log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
+
+ // publish the new job model first
+ zkUtils.publishJobModel(nextJMVersion, jobModel);
+ log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
+
+ // start the barrier for the job model update
+ barrier.start(nextJMVersion, currentProcessors);
+
+ // publish new JobModel version
+ zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
+ log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
new file mode 100644
index 0000000..e211f70
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.processor.SamzaContainerController;
+
+public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
+ /**
+ * Method to instantiate an implementation of JobCoordinator
+ *
+ * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with
+ * @param config Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+ * @return An instance of IJobCoordinator
+ */
+ @Override
+ public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+ JobConfig jobConfig = new JobConfig(config);
+ String groupName = String.format("%s-%s", jobConfig.getName(), jobConfig.getJobId());
+ ZkConfig zkConfig = new ZkConfig(config);
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+ return new ZkJobCoordinator(
+ processorId,
+ config,
+ debounceTimer,
+ new ZkUtils(
+ String.valueOf(processorId),
+ new ZkKeyBuilder(groupName),
+ zkClient,
+ zkConfig.getZkConnectionTimeoutMs()
+ ),
+ containerController);
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 73376b1..e8170e3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -64,12 +64,13 @@ public class ZkUtils {
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
- private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO
+ private final String processorId;
- public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+ public ZkUtils(String processorId, ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
this.keyBuilder = zkKeyBuilder;
this.connectionTimeoutMs = connectionTimeoutMs;
this.zkClient = zkClient;
+ this.processorId = processorId;
}
public void connect() throws ZkInterruptedException {
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index e57372f..23a8cc1 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -48,19 +48,13 @@ public class TestScheduleAfterDebounceTime {
ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
- debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
- {
+ debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> {
testObj.inc();
- }
- );
+ });
// action is delayed
Assert.assertEquals(0, i);
- try {
- Thread.sleep(DEBOUNCE_TIME + 10);
- } catch (InterruptedException e) {
- Assert.fail("Sleep was interrupted");
- }
+ TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
// debounce time passed
Assert.assertEquals(1, i);
@@ -85,11 +79,8 @@ public class TestScheduleAfterDebounceTime {
}
);
- try {
- Thread.sleep(DEBOUNCE_TIME + 10);
- } catch (InterruptedException e) {
- Assert.fail("Sleep was interrupted");
- }
+ TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
+
// still should be the old value
Assert.assertEquals(0, i);
@@ -100,11 +91,8 @@ public class TestScheduleAfterDebounceTime {
}
);
- try {
- Thread.sleep(3 * DEBOUNCE_TIME + 10);
- } catch (InterruptedException e) {
- Assert.fail("Sleep was interrupted");
- }
+ TestZkUtils.sleepMs(3 * DEBOUNCE_TIME + 10);
+
Assert.assertEquals(100, i);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 92cb2c9..f26d4d0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -81,7 +81,7 @@ public class TestZkBarrierForVersionUpgrade {
}
final Status s = new Status();
- barrier.startBarrier(ver, processors);
+ barrier.start(ver, processors);
barrier.waitForBarrier(ver, "p1", new Runnable() {
@Override
@@ -117,7 +117,7 @@ public class TestZkBarrierForVersionUpgrade {
}
final Status s = new Status();
- barrier.startBarrier(ver, processors);
+ barrier.start(ver, processors);
barrier.waitForBarrier(ver, "p1", new Runnable() {
@Override
@@ -134,13 +134,62 @@ public class TestZkBarrierForVersionUpgrade {
});
Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+ }
+
+ @Test
+ public void testZkBarrierForVersionUpgradeWithTimeOut() {
+ ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+ ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer) {
+ @Override
+ protected long getBarrierTimeOutMs() {
+ return 200;
+ }
+ };
+ String ver = "1";
+ List<String> processors = new ArrayList<String>();
+ processors.add("p1");
+ processors.add("p2");
+ processors.add("p3");
+
+ class Status {
+ boolean p1 = false;
+ boolean p2 = false;
+ boolean p3 = false;
+ }
+ final Status s = new Status();
+
+ barrier.start(ver, processors);
+ barrier.waitForBarrier(ver, "p1", new Runnable() {
+ @Override
+ public void run() {
+ s.p1 = true;
+ }
+ });
+
+ barrier.waitForBarrier(ver, "p2", new Runnable() {
+ @Override
+ public void run() {
+ s.p2 = true;
+ }
+ });
+
+ // this node will join "too late"
+ barrier.waitForBarrier(ver, "p3", new Runnable() {
+ @Override
+ public void run() {
+ TestZkUtils.sleepMs(300);
+ s.p3 = true;
+ }
+ });
+ Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
}
private ZkUtils getZkUtilsWithNewClient() {
ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
return new ZkUtils(
+ "1",
KEY_BUILDER,
ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index bfda464..12fa922 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -501,6 +501,7 @@ public class TestZkLeaderElector {
private ZkUtils getZkUtilsWithNewClient() {
ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
return new ZkUtils(
+ "processorId1",
KEY_BUILDER,
ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
CONNECTION_TIMEOUT_MS);
http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 58c3ed6..913bd49 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -67,6 +67,7 @@ public class TestZkUtils {
}
zkUtils = new ZkUtils(
+ "testProcessorId",
KEY_BUILDER,
zkClient,
SESSION_TIMEOUT_MS);
@@ -209,4 +210,12 @@ public class TestZkUtils {
}
return false;
}
+
+ public static void sleepMs(long delay) {
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ Assert.fail("Sleep was interrupted");
+ }
+ }
}