You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/03/09 01:16:59 UTC

samza git commit: SAMZA-1124; Job coordinator with time out

Repository: samza
Updated Branches:
  refs/heads/master 6f811de30 -> a974b236a


SAMZA-1124; Job coordinator with time out

If a processor doesn't join the barrier for the TimeOut time - the barrier is cancelled. All the processor should unsubscribe from it.

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: Boris Shkolnik <bo...@apache.org>
Author: navina <na...@apache.org>

Reviewers: Xinyu Liu <xi...@linkedin.com>

Closes #77 from sborya/JobCoordinatorWithTO and squashes the following commits:

ed055dd [Boris Shkolnik] checkstyle
1468902 [Boris Shkolnik] renambed a method
579d2e7 [Boris Shkolnik] Merge branch 'JobCoordinator' into JobCoordinatorWithTO
54ab688 [Boris Shkolnik] removed JavaJobConfig.java
3f95d46 [Boris Shkolnik] checkstyle
75f9a94 [Boris Shkolnik] added test for time out
b73ba32 [Boris Shkolnik] merge + test
5d67be0 [Boris Shkolnik] removed extra empty lines
9cf3c3e [Boris Shkolnik] addressed review comments
c47031d [Boris Shkolnik] added timeout for ZkBarrierForVersionUpgrade
93a55a9 [Boris Shkolnik] use write directly in the barrier when all joined
f89a037 [Boris Shkolnik] addressed some notes
4219720 [Boris Shkolnik] added JavaJobConfig
659ae7c [Boris Shkolnik] add ZkJobCoordinatorFactory
520a083 [Boris Shkolnik] added ZKJobCoordinatorFactory
a42218e [Boris Shkolnik] checkstyle
02ca658 [Boris Shkolnik] typo
c1fd0b2 [Boris Shkolnik] merge cleanup
1c8eef4 [Boris Shkolnik] merge cleanup
a7e014a [Boris Shkolnik] merged
b4a0642 [Boris Shkolnik] typo
33bacdd [Boris Shkolnik] merge
d1582a9 [Boris Shkolnik] missed method name change
4c481a2 [Boris Shkolnik] merge
b811f3d [Boris Shkolnik] changed to private final
5d43419 [Boris Shkolnik] addressed review comments
1a0c54d [Boris Shkolnik] fixed test
e19d77b [Boris Shkolnik] renamed method
0c5edab [Boris Shkolnik] makey tryBecomeALeader async
b34f6b7 [Boris Shkolnik] added java doc
c2305b6 [Boris Shkolnik] cleanup
f83fc57 [Boris Shkolnik] merge
f8c8a6d [Boris Shkolnik] make a smaller PR for publish functionality only
251aad7 [Boris Shkolnik] removed unneeded interface for real
9892dee [Boris Shkolnik] removed unneeded interface
f20d15f [Boris Shkolnik] some updates to JobCoordinator
bd53c07 [Boris Shkolnik] deleteing already committed files
18198d1 [Boris Shkolnik] added test for zk barrier
9dba992 [Boris Shkolnik] moved the Test in to test subdir
c16d864 [Boris Shkolnik] added test
817a7b6 [Boris Shkolnik] merge complete
1e5947f [Boris Shkolnik] merged
6506b48 [Boris Shkolnik] merged with latest
4290b13 [Boris Shkolnik] merged
43eb076 [Boris Shkolnik] checkstyle errors
e0c44fe [Boris Shkolnik] merged
8e8d833 [Boris Shkolnik] merge
e59d38c [Boris Shkolnik] merge with ZkController
6a71cf6 [Boris Shkolnik] renamed the listners
6cbcf6e [Boris Shkolnik] review comments
efbee84 [Boris Shkolnik] Checkstyle errors
132300c [Boris Shkolnik] converted ZkLeaderElector.tryBecomeLeader to async method
13a05d7 [Boris Shkolnik] merge
2d59e0c [Boris Shkolnik] merge
82c819b [Boris Shkolnik] merge
7ebe9a6 [Boris Shkolnik] check style
41b2e46 [Boris Shkolnik] refactoring to match the new ZkUtils constructor
b07d63a [Boris Shkolnik] merge JavaJobConfig
bdc953b [Boris Shkolnik] merged
4301372 [Boris Shkolnik] added tests
4d48d83 [Boris Shkolnik] merge
c9bb475 [Boris Shkolnik] added tests for ZkUtils
592e9bb [Boris Shkolnik] added missing functionality for ZkControllerImpl into zkUtils and zkKeyBuilder
b473a6e [Boris Shkolnik] Added the new file ZkControllerListener.java
3412ed4 [Boris Shkolnik] Renamed ZkListener to ZkControllerListener
fabddc9 [Boris Shkolnik] merge
ad9108a [Boris Shkolnik] merge with ScheduleAfterDebounceTime
ba583d6 [Boris Shkolnik] merge
3d6b993 [Boris Shkolnik] cleaned up
fe69e70 [Boris Shkolnik] merge
7f8125b [Boris Shkolnik] Merge branch 'master' into ZkTestUtils
eaf04bb [Boris Shkolnik] added more comments
358ae6b [Boris Shkolnik] Added test and addressed review comments
9b22eb6 [Boris Shkolnik] JobModelPublish
0ef90b6 [Boris Shkolnik] Merge branch 'JobModel' into JobModelPublish
9c59048 [Boris Shkolnik] merge
017fe79 [Boris Shkolnik] added tests
5c8aa20 [Boris Shkolnik] JobModel Generation using SimpleGroupByContainerCount
2c841e1 [Boris Shkolnik] added awaitStart
eeb69ca [Boris Shkolnik] Merge branch 'ZkBarrier' into JobCoordinator
dc26bd2 [Boris Shkolnik] added BarrierForVersionUpgrade
cfdb4c7 [Boris Shkolnik] Merge branch 'ZkController' into ZkBarrier
b28ba14 [Boris Shkolnik] ZkBarrier
c8d26ba [Boris Shkolnik] merged
efc4d03 [Boris Shkolnik] ZkController
c9b3fe4 [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime' into ZkController
f0cae7b [Boris Shkolnik] Merge branch 'LeaderElector' into ZkController
3df0def [Boris Shkolnik] ZkControllerImpl
4801613 [Boris Shkolnik] cleanup
d32045b [Boris Shkolnik] Merge branch 'ScheduleAfterDebounceTime1' into JobCoordinator
32f96b4 [Boris Shkolnik] added ScheduleAfterDebounce
ce83409 [Boris Shkolnik] cleanup
d7a7ccb [Boris Shkolnik] Merge branch 'LeaderElector' into ScheduleAfterDebounceTime
9c6b20a [Boris Shkolnik] added ZkListener
5f867c0 [Boris Shkolnik] added Apache license info
d0687b9 [Boris Shkolnik] Merge branch 'LeaderElector' into JobCoordinator
372829f [Boris Shkolnik] Merge branch 'master' into JobCoordinator
14db43d [Boris Shkolnik] added TestZkStreamProcessor - main manual test
b3b27c6 [Boris Shkolnik] Merge branch 'LeaderElector' into TestZkStreamProcessor
6649c80 [navina] Fixing ZkUtils close(). No need to close underlying connection explicitly
7f17e26 [Boris Shkolnik] ZkTestUtils
f904cd3 [Boris Shkolnik] ScheduleAfterDebounceTime
d126b10 [Boris Shkolnik] ScheduleAfterDebounceTime
63d8d60 [Boris Shkolnik] JavaJobConfig
ff15501 [Boris Shkolnik] JavaJobConfig
d20bacf [Boris Shkolnik] ZkTestUtils
737eb2f [Boris Shkolnik] ZkTestUtils
8e2d6c1 [Boris Shkolnik] add main manual test
7a47f84 [Boris Shkolnik] add main manual test
fa2186b [Boris Shkolnik] added ZkController
a0a7409 [Boris Shkolnik] Merge branch 'master' of https://github.com/navina/samza
edda60d [navina] Removing an unintended change to the grouper
6dd6b8d [navina] Adding tests for ZkLeaderElector
1734f8f [navina] Adding tests for ZkUtils
317cf16 [navina] Adding tests for ZkKeyBuilder
aaaf24e [navina] Adding EmbeddedZookeeper for testing
37c2c8b [navina] Extracting files related to LeaderElection
76b5167 [Boris Shkolnik] added new line at then end


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/a974b236
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/a974b236
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/a974b236

Branch: refs/heads/master
Commit: a974b236a78826d993e50d8936a4cc393a97bfe4
Parents: 6f811de
Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Authored: Wed Mar 8 17:16:51 2017 -0800
Committer: Xinyu Liu <xi...@xiliu-ld.linkedin.biz>
Committed: Wed Mar 8 17:16:51 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |   1 +
 gradle/dependency-versions.gradle               |   1 +
 .../samza/zk/BarrierForVersionUpgrade.java      |   3 +-
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 114 ++++++----
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 221 +++++++++++++++++++
 .../samza/zk/ZkJobCoordinatorFactory.java       |  57 +++++
 .../main/java/org/apache/samza/zk/ZkUtils.java  |   5 +-
 .../samza/zk/TestScheduleAfterDebounceTime.java |  26 +--
 .../zk/TestZkBarrierForVersionUpgrade.java      |  53 ++++-
 .../apache/samza/zk/TestZkLeaderElector.java    |   1 +
 .../java/org/apache/samza/zk/TestZkUtils.java   |   9 +
 11 files changed, 422 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 400a913..ea1e3c2 100644
--- a/build.gradle
+++ b/build.gradle
@@ -160,6 +160,7 @@ project(":samza-core_$scalaVersion") {
     compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
     compile "org.eclipse.jetty:jetty-webapp:$jettyVersion"
     compile "com.101tec:zkclient:$zkClientVersion"
+    compile "org.apache.commons:commons-collections4:$apacheCommonsCollections4Version"
     testCompile project(":samza-api").sourceSets.test.output
     testCompile "junit:junit:$junitVersion"
     testCompile "org.mockito:mockito-all:$mockitoVersion"

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/gradle/dependency-versions.gradle
----------------------------------------------------------------------
diff --git a/gradle/dependency-versions.gradle b/gradle/dependency-versions.gradle
index 0a8542b..20af1da 100644
--- a/gradle/dependency-versions.gradle
+++ b/gradle/dependency-versions.gradle
@@ -39,4 +39,5 @@
   commonsCollectionVersion = "3.2.1"
   httpClientVersion="4.4.1"
   commonsLang3Version="3.4"
+  apacheCommonsCollections4Version="4.0"
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
index b2d80d0..2b785f0 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/BarrierForVersionUpgrade.java
@@ -32,12 +32,13 @@ public interface BarrierForVersionUpgrade {
    * @param version - for which the barrier is started.
    * @param processorsNames - list of processors available at the time of the JobModel generation.
    */
-  void startBarrier(String version,  List<String> processorsNames);
+  void start(String version, List<String> processorsNames);
 
   /**
    * Called by the processor.
    * Updates the processor readiness to use the new version and wait on the barrier, until all other processors
    * joined.
+   * The call is async. The callback will be invoked when the barrier is reached.
    * @param version of the jobModel this barrier is protecting.
    * @param processorsName as it appears in the list of processors.
    * @param callback  will be invoked, when barrier is reached.

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 3ec87b0..f7efa48 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -19,10 +19,12 @@
 
 package org.apache.samza.zk;
 
+import java.util.Arrays;
 import java.util.List;
-
 import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,6 +33,8 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
   private final ZkUtils zkUtils;
   private final ZkKeyBuilder keyBuilder;
   private final static String BARRIER_DONE = "done";
+  private final static String BARRIER_TIMED_OUT = "TIMED_OUT";
+  private final static long BARRIER_TIMED_OUT_MS = 60 * 1000;
   private final static Logger LOG = LoggerFactory.getLogger(ZkBarrierForVersionUpgrade.class);
 
   private final ScheduleAfterDebounceTime debounceTimer;
@@ -45,36 +49,66 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
     this.debounceTimer = debounceTimer;
   }
 
+  /**
+   * set the barrier for the timer. If the timer is not achieved by the timeout - it will fail
+   * @param version for which the barrier is created
+   * @param timeout - time in ms to wait
+   */
+  public void setTimer(String version, long timeout) {
+    debounceTimer.scheduleAfterDebounceTime("VersionUpgradeTimeout", timeout, ()->timerOff(version));
+  }
+
+  protected long getBarrierTimeOutMs() {
+    return BARRIER_TIMED_OUT_MS;
+  }
+
+  private void timerOff(String version) {
+    // check if barrier has finished
+    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    Stat stat = new Stat();
+    String done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat);
+    if (done != null && done.equals(BARRIER_DONE))
+      return; //nothing to do
+
+    while (true) {
+      try {
+        // write a new value if no one else did, if the value was changed since previous reading - retry
+        zkUtils.getZkClient().writeData(barrierDonePath, "TIMED_OUT", stat.getVersion());
+        return;
+      } catch (Exception e) {
+        // failed to write, try read/write again
+        LOG.info("Barrier timeout write failed");
+        done = zkUtils.getZkClient().<String>readData(barrierDonePath, stat);
+        if (done.equals(BARRIER_DONE))
+          return; //nothing to do
+      }
+    }
+  }
+
   @Override
-  public void startBarrier(String version, List<String> processorsNames) {
-    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
-    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
-    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+  public void start(String version, List<String> processorsNames) {
+    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    final String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
 
     zkUtils.makeSurePersistentPathsExists(new String[]{barrierPrefix, barrierPath, barrierProcessors, barrierDonePath});
 
-    // callback for when the barrier is reached
-    Runnable callback = new Runnable() {
-      @Override
-      public void run() {
-        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
-        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
-      }
-    };
     // subscribe for processor's list changes
     LOG.info("Subscribing for child changes at " + barrierProcessors);
     zkUtils.getZkClient().subscribeChildChanges(barrierProcessors,
-        new ZkBarrierChangeHandler(callback, processorsNames));
+        new ZkBarrierChangeHandler(version, processorsNames));
+
+    setTimer(version, getBarrierTimeOutMs());
   }
 
   @Override
   public void waitForBarrier(String version, String processorsName, Runnable callback) {
     // if participant makes this call it means it has already stopped the old container and got the new job model.
-    String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
-    String barrierDonePath = String.format("%s/barrier_done", barrierPath);
-    String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
-    String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
-
+    final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+    final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+    final String barrierProcessors = String.format("%s/barrier_processors", barrierPath);
+    final String barrierProcessorThis = String.format("%s/%s", barrierProcessors, processorsName);
 
     // update the barrier for this processor
     LOG.info("Creating a child for barrier at " + barrierProcessorThis);
@@ -88,47 +122,32 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
    * listener for the subscription.
    */
   class ZkBarrierChangeHandler implements IZkChildListener {
-    Runnable callback;
-    List<String> names;
+    private final String version;
+    private final List<String> names;
 
-    public ZkBarrierChangeHandler(Runnable callback, List<String> names) {
-      this.callback = callback;
+    public ZkBarrierChangeHandler(String version, List<String> names) {
+      this.version = version;
       this.names = names;
     }
 
     @Override
     public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
-      // Find out the event & Log
-      boolean allIn = true;
 
       if (currentChildren == null) {
         LOG.info("Got handleChildChange with null currentChildren");
         return;
       }
-      // debug
-      StringBuilder sb = new StringBuilder();
-      for (String child : currentChildren) {
-        sb.append(child).append(",");
-      }
-      LOG.info("list of children in the barrier = " + parentPath + ":" + sb.toString());
-      sb = new StringBuilder();
-      for (String child : names) {
-        sb.append(child).append(",");
-      }
-      LOG.info("list of children to compare against = " + parentPath + ":" + sb.toString());
 
+      LOG.info("list of children in the barrier = " + parentPath + ":" + Arrays.toString(currentChildren.toArray()));
+      LOG.info("list of children to compare against = " + parentPath + ":" + Arrays.toString(names.toArray()));
 
       // check if all the names are in
-      for (String n : names) {
-        if (!currentChildren.contains(n)) {
-          LOG.info("node " + n + " is still not in the list ");
-          allIn = false;
-          break;
-        }
-      }
-      if (allIn) {
+      if (CollectionUtils.containsAll(names, currentChildren)) {
         LOG.info("ALl nodes reached the barrier");
-        callback.run(); // all the names have registered
+        final String barrierPath = String.format("%s/barrier_%s", barrierPrefix, version);
+        final String barrierDonePath = String.format("%s/barrier_done", barrierPath);
+        LOG.info("Writing BARRIER DONE to " + barrierDonePath);
+        zkUtils.getZkClient().writeData(barrierDonePath, BARRIER_DONE);
       }
     }
   }
@@ -152,6 +171,11 @@ public class ZkBarrierForVersionUpgrade implements BarrierForVersionUpgrade {
       if (done.equals(BARRIER_DONE)) {
         zkUtils.unsubscribeDataChanges(barrierPathDone, this);
         debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, callback);
+      } else if (done.equals(BARRIER_TIMED_OUT)) {
+        // timed out
+        LOG.error("Barrier for " + dataPath + " timed out");
+        System.out.println("Barrier for " + dataPath + " timed out");
+        zkUtils.unsubscribeDataChanges(barrierPathDone, this);
       }
       // we do not need to resubscribe because, ZkClient library does it for us.
 

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
new file mode 100644
index 0000000..1d16d4a
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JavaSystemConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobModelManager;
+import org.apache.samza.coordinator.JobModelManager$;
+import org.apache.samza.job.model.JobModel;
+import org.apache.samza.processor.SamzaContainerController;
+import org.apache.samza.system.StreamMetadataCache;
+import org.apache.samza.system.SystemAdmin;
+import org.apache.samza.system.SystemFactory;
+import org.apache.samza.util.SystemClock;
+import org.apache.samza.util.Util;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JobCoordinator for stand alone processor managed via Zookeeper.
+ */
+public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
+  private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
+
+  private final ZkUtils zkUtils;
+  private final int processorId;
+  private final ZkController zkController;
+  private final SamzaContainerController containerController;
+  private final BarrierForVersionUpgrade barrier;
+  private final ScheduleAfterDebounceTime debounceTimer;
+  private final StreamMetadataCache  streamMetadataCache;
+  private final ZkKeyBuilder keyBuilder;
+  private final Config config;
+
+  private JobModel newJobModel;
+  private String newJobModelVersion;  // version published in ZK (by the leader)
+  private JobModelManager jobModelManager;
+
+  public ZkJobCoordinator(int processorId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils, SamzaContainerController containerController) {
+    this.zkUtils = zkUtils;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    this.debounceTimer = debounceTimer;
+    this.processorId = processorId;
+    this.containerController = containerController;
+    this.zkController = new ZkControllerImpl(String.valueOf(processorId), zkUtils, debounceTimer, this);
+    this.config = config;
+
+
+    barrier = new ZkBarrierForVersionUpgrade(zkUtils, debounceTimer);
+    streamMetadataCache = getStreamMetadataCache();
+  }
+
+  private StreamMetadataCache getStreamMetadataCache() {
+    // model generation - NEEDS TO BE REVIEWED
+    JavaSystemConfig systemConfig = new JavaSystemConfig(this.config);
+    Map<String, SystemAdmin> systemAdmins = new HashMap<>();
+    for (String systemName: systemConfig.getSystemNames()) {
+      String systemFactoryClassName = systemConfig.getSystemFactory(systemName);
+      if (systemFactoryClassName == null) {
+        String msg = String.format("A stream uses system %s, which is missing from the configuration.", systemName);
+        log.error(String.format(msg));
+        throw new SamzaException(msg);
+      }
+      SystemFactory systemFactory = Util.getObj(systemFactoryClassName);
+      systemAdmins.put(systemName, systemFactory.getAdmin(systemName, this.config));
+    }
+
+    return new StreamMetadataCache(Util.<String, SystemAdmin>javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance());
+  }
+
+  @Override
+  public void start() {
+    zkController.register();
+  }
+
+  public void cleanupZk() {
+    zkUtils.deleteRoot();
+  }
+
+  @Override
+  public void stop() {
+    zkController.stop();
+  }
+
+  @Override
+  public boolean awaitStart(long timeoutMs)
+      throws InterruptedException {
+    return containerController.awaitStart(timeoutMs);
+  }
+
+  @Override
+  public int getProcessorId() {
+    return processorId;
+  }
+
+  @Override
+  public JobModel getJobModel() {
+    return newJobModel;
+  }
+
+  //////////////////////////////////////////////// LEADER stuff ///////////////////////////
+  @Override
+  public void onBecomeLeader() {
+    log.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+
+    List<String> emptyList = new ArrayList<>();
+
+    // actual actions to do are the same as onProcessorChange()
+    debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+        ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> onProcessorChange(emptyList));
+  }
+
+  @Override
+  public void onProcessorChange(List<String> processorIds) {
+    log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray()));
+    generateNewJobModel();
+  }
+
+  @Override
+  public void onNewJobModelAvailable(final String version) {
+    newJobModelVersion = version;
+    log.info("pid=" + processorId + "new JobModel available");
+    // stop current work
+    containerController.stopContainer();
+    log.info("pid=" + processorId + "new JobModel available.Container stopped.");
+    // get the new job model
+    newJobModel = zkUtils.getJobModel(version);
+    log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel);
+
+    String currentPath = zkUtils.getEphemeralPath();
+    String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
+
+    // update ZK and wait for all the processors to get this new version
+    barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
+      @Override
+      public void run() {
+        onNewJobModelConfirmed(version);
+      }
+    });
+  }
+
+  @Override
+  public void onNewJobModelConfirmed(String version) {
+    log.info("pid=" + processorId + "new version " + version + " of the job model got confirmed");
+    // get the new Model
+    JobModel jobModel = getJobModel();
+    log.info("pid=" + processorId + "got the new job model in JobModelConfirmed =" + jobModel);
+
+    // start the container with the new model
+    containerController.startContainer(jobModel.getContainers().get(processorId), jobModel.getConfig(),
+        jobModel.maxChangeLogStreamPartitions);
+  }
+
+  /**
+   * Generate new JobModel when becoming a leader or the list of processor changed.
+   */
+  private void generateNewJobModel() {
+    // get the current list of processors
+    List<String> currentProcessors = zkUtils.getSortedActiveProcessors();
+
+    // get the current version
+    String currentJMVersion  = zkUtils.getJobModelVersion();
+    String nextJMVersion;
+    if (currentJMVersion == null) {
+      log.info("pid=" + processorId + "generating first version of the model");
+      nextJMVersion = "1";
+    } else {
+      nextJMVersion = Integer.toString(Integer.valueOf(currentJMVersion) + 1);
+    }
+    log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
+
+    StringBuilder sb = new StringBuilder();
+    List<Integer> containerIds = new ArrayList<>();
+    for (String processor : currentProcessors) {
+      String zkProcessorId = keyBuilder.parseIdFromPath(processor);
+      sb.append(zkProcessorId).append(",");
+      containerIds.add(Integer.valueOf(zkProcessorId));
+    }
+    log.info("generate new job model: processorsIds: " + sb.toString());
+
+    jobModelManager = JobModelManager$.MODULE$.getJobCoordinator(this.config, null, null, streamMetadataCache, null,
+        containerIds);
+    JobModel jobModel = jobModelManager.jobModel();
+
+    log.info("pid=" + processorId + "Generated jobModel: " + jobModel);
+
+    // publish the new job model first
+    zkUtils.publishJobModel(nextJMVersion, jobModel);
+    log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
+
+    // start the barrier for the job model update
+    barrier.start(nextJMVersion, currentProcessors);
+
+    // publish new JobModel version
+    zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);
+    log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
new file mode 100644
index 0000000..e211f70
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.Config;
+import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.JobCoordinator;
+import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.processor.SamzaContainerController;
+
+public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
+  /**
+   * Method to instantiate an implementation of JobCoordinator
+   *
+   * @param processorId Indicates the StreamProcessor's id to which this Job Coordinator is associated with
+   * @param config      Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+   * @return An instance of IJobCoordinator
+   */
+  @Override
+  public JobCoordinator getJobCoordinator(int processorId, Config config, SamzaContainerController containerController) {
+    JobConfig jobConfig = new JobConfig(config);
+    String groupName = String.format("%s-%s", jobConfig.getName(), jobConfig.getJobId());
+    ZkConfig zkConfig = new ZkConfig(config);
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
+    return new ZkJobCoordinator(
+        processorId,
+        config,
+        debounceTimer,
+        new ZkUtils(
+            String.valueOf(processorId),
+            new ZkKeyBuilder(groupName),
+            zkClient,
+            zkConfig.getZkConnectionTimeoutMs()
+            ),
+        containerController);
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 73376b1..e8170e3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -64,12 +64,13 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
-  private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO
+  private final String processorId;
 
-  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
+  public ZkUtils(String processorId, ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
     this.keyBuilder = zkKeyBuilder;
     this.connectionTimeoutMs = connectionTimeoutMs;
     this.zkClient = zkClient;
+    this.processorId = processorId;
   }
 
   public void connect() throws ZkInterruptedException {

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
index e57372f..23a8cc1 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestScheduleAfterDebounceTime.java
@@ -48,19 +48,13 @@ public class TestScheduleAfterDebounceTime {
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
 
     final TestObj testObj = new TestScheduleAfterDebounceTime.TestObj();
-    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () ->
-      {
+    debounceTimer.scheduleAfterDebounceTime("TEST1", DEBOUNCE_TIME, () -> {
         testObj.inc();
-      }
-    );
+      });
     // action is delayed
     Assert.assertEquals(0, i);
 
-    try {
-      Thread.sleep(DEBOUNCE_TIME + 10);
-    } catch (InterruptedException e) {
-      Assert.fail("Sleep was interrupted");
-    }
+    TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
 
     // debounce time passed
     Assert.assertEquals(1, i);
@@ -85,11 +79,8 @@ public class TestScheduleAfterDebounceTime {
       }
     );
 
-    try {
-      Thread.sleep(DEBOUNCE_TIME + 10);
-    } catch (InterruptedException e) {
-      Assert.fail("Sleep was interrupted");
-    }
+    TestZkUtils.sleepMs(DEBOUNCE_TIME + 10);
+
     // still should be the old value
     Assert.assertEquals(0, i);
 
@@ -100,11 +91,8 @@ public class TestScheduleAfterDebounceTime {
       }
     );
 
-    try {
-      Thread.sleep(3 * DEBOUNCE_TIME + 10);
-    } catch (InterruptedException e) {
-      Assert.fail("Sleep was interrupted");
-    }
+    TestZkUtils.sleepMs(3 * DEBOUNCE_TIME + 10);
+
     Assert.assertEquals(100, i);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 92cb2c9..f26d4d0 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -81,7 +81,7 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.startBarrier(ver, processors);
+    barrier.start(ver, processors);
 
     barrier.waitForBarrier(ver, "p1", new Runnable() {
       @Override
@@ -117,7 +117,7 @@ public class TestZkBarrierForVersionUpgrade {
     }
     final Status s = new Status();
 
-    barrier.startBarrier(ver, processors);
+    barrier.start(ver, processors);
 
     barrier.waitForBarrier(ver, "p1", new Runnable() {
       @Override
@@ -134,13 +134,62 @@ public class TestZkBarrierForVersionUpgrade {
     });
 
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 100));
+  }
+
+  @Test
+  public void testZkBarrierForVersionUpgradeWithTimeOut() {
+    ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
+    ZkBarrierForVersionUpgrade barrier = new ZkBarrierForVersionUpgrade(testZkUtils, debounceTimer) {
+      @Override
+      protected long getBarrierTimeOutMs() {
+        return 200;
+      }
+    };
+    String ver = "1";
+    List<String> processors = new ArrayList<String>();
+    processors.add("p1");
+    processors.add("p2");
+    processors.add("p3");
+
+    class Status {
+      boolean p1 = false;
+      boolean p2 = false;
+      boolean p3 = false;
+    }
+    final Status s = new Status();
+
+    barrier.start(ver, processors);
 
+    barrier.waitForBarrier(ver, "p1", new Runnable() {
+      @Override
+      public void run() {
+        s.p1 = true;
+      }
+    });
+
+    barrier.waitForBarrier(ver, "p2", new Runnable() {
+      @Override
+      public void run() {
+        s.p2 = true;
+      }
+    });
+
+    // this node will join "too late"
+    barrier.waitForBarrier(ver, "p3", new Runnable() {
+      @Override
+      public void run() {
+        TestZkUtils.sleepMs(300);
+        s.p3 = true;
+      }
+    });
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> s.p1 && s.p2 && s.p3, 2, 400));
   }
 
 
   private ZkUtils getZkUtilsWithNewClient() {
     ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
+        "1",
         KEY_BUILDER,
         ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
         CONNECTION_TIMEOUT_MS);

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index bfda464..12fa922 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -501,6 +501,7 @@ public class TestZkLeaderElector {
   private ZkUtils getZkUtilsWithNewClient() {
     ZkConnection zkConnection = ZkUtils.createZkConnection(testZkConnectionString, SESSION_TIMEOUT_MS);
     return new ZkUtils(
+        "processorId1",
         KEY_BUILDER,
         ZkUtils.createZkClient(zkConnection, CONNECTION_TIMEOUT_MS),
         CONNECTION_TIMEOUT_MS);

http://git-wip-us.apache.org/repos/asf/samza/blob/a974b236/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 58c3ed6..913bd49 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -67,6 +67,7 @@ public class TestZkUtils {
     }
 
     zkUtils = new ZkUtils(
+        "testProcessorId",
         KEY_BUILDER,
         zkClient,
         SESSION_TIMEOUT_MS);
@@ -209,4 +210,12 @@ public class TestZkUtils {
     }
     return false;
   }
+
+  public static void sleepMs(long delay) {
+    try {
+      Thread.sleep(delay);
+    } catch (InterruptedException e) {
+      Assert.fail("Sleep was interrupted");
+    }
+  }
 }