You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by jm...@apache.org on 2017/03/07 00:18:51 UTC

[2/9] samza git commit: SAMZA-1102: Zk controller

SAMZA-1102: Zk controller

SAMZA-1102: Added ZKController and ZkControllerImpl

Author: Boris Shkolnik <bs...@bshkolni-ld1.linkedin.biz>
Author: navina <na...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>, Fred Ji <fj...@apache.org>, Xinyu Liu <xi...@linkedin.com>

Closes #50 from sborya/ZkController


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f1bc1d0b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f1bc1d0b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f1bc1d0b

Branch: refs/heads/samza-fluent-api-v1
Commit: f1bc1d0b36242170930c0001c9efa7e5c24f8dd0
Parents: e6147fd
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Feb 23 14:02:05 2017 -0800
Committer: navina <na...@apache.org>
Committed: Thu Feb 23 14:02:05 2017 -0800

----------------------------------------------------------------------
 .../processor/SamzaContainerController.java     |   1 +
 .../apache/samza/processor/StreamProcessor.java |  10 +-
 .../java/org/apache/samza/zk/ZkController.java  |  32 ++++
 .../org/apache/samza/zk/ZkControllerImpl.java   | 163 +++++++++++++++++++
 .../apache/samza/zk/ZkControllerListener.java   |  34 ++++
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  |  22 ++-
 .../org/apache/samza/zk/ZkLeaderElector.java    |  36 ++--
 .../main/java/org/apache/samza/zk/ZkUtils.java  |  49 ++++++
 .../org/apache/samza/zk/TestZkKeyBuilder.java   |   4 +-
 .../apache/samza/zk/TestZkLeaderElector.java    | 152 ++++++++++++++---
 .../java/org/apache/samza/zk/TestZkUtils.java   | 105 ++++++++++--
 11 files changed, 549 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
index d448d30..76e2053 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/SamzaContainerController.java
@@ -60,6 +60,7 @@ public class SamzaContainerController {
    * @param taskFactory         Factory that be used create instances of {@link org.apache.samza.task.StreamTask} or
    *                            {@link org.apache.samza.task.AsyncStreamTask}
    * @param containerShutdownMs How long the Samza container should wait for an orderly shutdown of task instances
+   * @param processorId         Id of the processor
    * @param metricsReporterMap  Map of metric reporter name and {@link MetricsReporter} instance
    */
   public SamzaContainerController(

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 5e90c56..4d3e8ab 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -94,9 +94,14 @@ public class StreamProcessor {
     this(processorId, config, customMetricsReporters, (Object) asyncStreamTaskFactory);
   }
 
+
   /**
-   * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
+   *Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the provided {@link StreamTaskFactory}.
+   * @param processorId - this processor Id
+   * @param config - config
+   * @param customMetricsReporters metric Reporter
+   * @param streamTaskFactory task factory to instantiate the Task
    */
   public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters,
                          StreamTaskFactory streamTaskFactory) {
@@ -106,6 +111,9 @@ public class StreamProcessor {
   /**
    * Same as {@link #StreamProcessor(int, Config, Map, AsyncStreamTaskFactory)}, except task instances are created
    * using the "task.class" configuration instead of a task factory.
+   * @param processorId - this processor Id
+   * @param config - config
+   * @param customMetricsReporters metrics
    */
   public StreamProcessor(int processorId, Config config, Map<String, MetricsReporter> customMetricsReporters) {
     this(processorId, config, customMetricsReporters, (Object) null);

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkController.java b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
new file mode 100644
index 0000000..20c62cf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkController.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.samza.zk;
+
+/**
+ * Api to the functionality provided by ZK
+ */
+public interface ZkController {
+  void register();
+  boolean isLeader();
+  void notifyJobModelChange(String version);
+  void stop();
+  void listenToProcessorLiveness();
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
new file mode 100644
index 0000000..70c8a37
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.apache.samza.SamzaException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ZkControllerImpl implements ZkController {
+  private static final Logger LOG = LoggerFactory.getLogger(ZkControllerImpl.class);
+
+  private final String processorIdStr;
+  private final ZkUtils zkUtils;
+  private final ZkControllerListener zkControllerListener;
+  private final ZkLeaderElector leaderElector;
+  private final ScheduleAfterDebounceTime debounceTimer;
+
+  public ZkControllerImpl(String processorIdStr, ZkUtils zkUtils, ScheduleAfterDebounceTime debounceTimer,
+      ZkControllerListener zkControllerListener) {
+    this.processorIdStr = processorIdStr;
+    this.zkUtils = zkUtils;
+    this.zkControllerListener = zkControllerListener;
+    this.leaderElector = new ZkLeaderElector(processorIdStr, zkUtils,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            onBecomeLeader();
+          }
+        }
+    );
+    this.debounceTimer = debounceTimer;
+
+    init();
+  }
+
+  private void init() {
+    ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
+            .getJobModelPathPrefix()});
+  }
+
+  private void onBecomeLeader() {
+
+    listenToProcessorLiveness(); // subscribe for adding new processors
+
+    // inform the caller
+    zkControllerListener.onBecomeLeader();
+
+  }
+
+  @Override
+  public void register() {
+
+    // TODO - make a loop here with some number of attempts.
+    // possibly split into two method - becomeLeader() and becomeParticipant()
+    leaderElector.tryBecomeLeader();
+
+    // subscribe to JobModel version updates
+    zkUtils.subscribeToJobModelVersionChange(new ZkJobModelVersionChangeHandler(debounceTimer));
+  }
+
+  @Override
+  public boolean isLeader() {
+    return leaderElector.amILeader();
+  }
+
+  @Override
+  public void notifyJobModelChange(String version) {
+    zkControllerListener.onNewJobModelAvailable(version);
+  }
+
+  @Override
+  public void stop() {
+    if (isLeader()) {
+      leaderElector.resignLeadership();
+    }
+    zkUtils.close();
+  }
+
+  @Override
+  public void listenToProcessorLiveness() {
+    zkUtils.subscribeToProcessorChange(new ZkProcessorChangeHandler(debounceTimer));
+  }
+
+  // Only by Leader
+  class ZkProcessorChangeHandler  implements IZkChildListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkProcessorChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * Called when the children of the given path changed.
+     *
+     * @param parentPath    The parent path
+     * @param currentChilds The children or null if the root node (parent path) was deleted.
+     * @throws Exception
+     */
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+      LOG.info(
+          "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + "  Current Children: "
+              + currentChilds);
+      debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
+          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
+    }
+  }
+
+  class ZkJobModelVersionChangeHandler implements IZkDataListener {
+    private final ScheduleAfterDebounceTime debounceTimer;
+    public ZkJobModelVersionChangeHandler(ScheduleAfterDebounceTime debounceTimer) {
+      this.debounceTimer = debounceTimer;
+    }
+    /**
+     * called when job model version gets updated
+     * @param dataPath
+     * @param data
+     * @throws Exception
+     */
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+      LOG.info("pid=" + processorIdStr + ". Got notification on version update change. path=" + dataPath + "; data="
+          + (String) data);
+
+      debounceTimer
+          .scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () -> notifyJobModelChange((String) data));
+    }
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      throw new SamzaException("version update path has been deleted!");
+    }
+  }
+
+  public void shutdown() {
+    if (debounceTimer != null)
+      debounceTimer.stopScheduler();
+
+    if (zkUtils != null)
+      zkUtils.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
new file mode 100644
index 0000000..f7fedd7
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerListener.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import java.util.List;
+
+
+/**
+ * callbacks to the caller of the ZkController
+ */
+public interface ZkControllerListener {
+  void onBecomeLeader();
+  void onProcessorChange(List<String> processorIds);
+
+  void onNewJobModelAvailable(String version); // start job model update (stop current work)
+  void onNewJobModelConfirmed(String version); // start new work according to the new model
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 28344e9..d6cb9f3 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.zk;
 
-import com.google.common.base.Strings;
 import org.apache.samza.SamzaException;
+import com.google.common.base.Strings;
 
 /**
  * The following ZK hierarchy is maintained for Standalone jobs:
@@ -44,7 +44,7 @@ public class ZkKeyBuilder {
   private final String pathPrefix;
 
   static final String PROCESSORS_PATH = "processors";
-  static final String PROCESSOR_ID_PREFIX = "processor-";
+  public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
 
   public ZkKeyBuilder(String pathPrefix) {
     if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -53,6 +53,10 @@ public class ZkKeyBuilder {
     this.pathPrefix = pathPrefix.trim();
   }
 
+  public String getRootPath() {
+    return "/" + pathPrefix;
+  }
+
   public String getProcessorsPath() {
     return String.format("/%s/%s", pathPrefix, PROCESSORS_PATH);
   }
@@ -71,4 +75,18 @@ public class ZkKeyBuilder {
       return path.substring(path.lastIndexOf("/") + 1);
     return null;
   }
+
+  public String getJobModelVersionPath() {
+    return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+  }
+
+  public String getJobModelPathPrefix() {
+    return String.format("/%s/jobModels", pathPrefix);
+  }
+
+  public String getJobModelPath(String jobModelVersion) {
+    return String.format("%s/%s", getJobModelPathPrefix(), jobModelVersion);
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index 8cdf8fc..b9bdf11 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -50,25 +50,30 @@ public class ZkLeaderElector implements LeaderElector {
   private final String hostName;
 
   private AtomicBoolean isLeader = new AtomicBoolean(false);
-  private final IZkDataListener zkLeaderElectionListener;
+  private final IZkDataListener previousProcessorChangeListener;
+  ZkLeaderElectorListener zkLeaderElectorListener;
   private String currentSubscription = null;
   private final Random random = new Random();
 
   @VisibleForTesting
-  ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, IZkDataListener leaderElectionListener) {
+  ZkLeaderElector(String processorIdStr,
+      ZkUtils zkUtils,
+      ZkLeaderElectorListener zkLeaderElectorListener,
+      IZkDataListener previousProcessorChangeListener) {
     this.processorIdStr = processorIdStr;
     this.zkUtils = zkUtils;
-    this.zkLeaderElectionListener = leaderElectionListener;
     this.keyBuilder = this.zkUtils.getKeyBuilder();
     this.hostName = getHostName();
+    this.zkLeaderElectorListener = zkLeaderElectorListener; // listener to inform the caller that they have become the leader
+    if (previousProcessorChangeListener == null)
+      this.previousProcessorChangeListener =  new PreviousProcessorChangeListener();
+    else
+      this.previousProcessorChangeListener = previousProcessorChangeListener;
   }
 
-  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils) {
-    this.zkLeaderElectionListener = new ZkLeaderElectionListener();
-    this.processorIdStr = processorIdStr;
-    this.zkUtils = zkUtils;
-    this.keyBuilder = this.zkUtils.getKeyBuilder();
-    this.hostName = getHostName();
+  public ZkLeaderElector(String processorIdStr, ZkUtils zkUtils, ZkLeaderElectorListener zkLeaderElectorListener) {
+    this(processorIdStr, zkUtils, zkLeaderElectorListener, null);
+
   }
 
   // TODO: This should go away once we integrate with Zk based Job Coordinator
@@ -81,6 +86,10 @@ public class ZkLeaderElector implements LeaderElector {
     }
   }
 
+  public interface ZkLeaderElectorListener {
+    void onBecomingLeader();
+  }
+
   @Override
   public boolean tryBecomeLeader() {
     String currentPath = zkUtils.registerProcessorAndGetId(hostName);
@@ -96,6 +105,7 @@ public class ZkLeaderElector implements LeaderElector {
     if (index == 0) {
       isLeader.getAndSet(true);
       LOGGER.info(zLog("Eligible to become the leader!"));
+      zkLeaderElectorListener.onBecomingLeader(); // inform the caller
       return true;
     }
 
@@ -105,11 +115,13 @@ public class ZkLeaderElector implements LeaderElector {
     if (!predecessor.equals(currentSubscription)) {
       if (currentSubscription != null) {
         LOGGER.debug(zLog("Unsubscribing data change for " + currentSubscription));
-        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+        zkUtils.unsubscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+            previousProcessorChangeListener);
       }
       currentSubscription = predecessor;
       LOGGER.info(zLog("Subscribing data change for " + predecessor));
-      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription, zkLeaderElectionListener);
+      zkUtils.subscribeDataChanges(keyBuilder.getProcessorsPath() + "/" + currentSubscription,
+          previousProcessorChangeListener);
     }
     /**
      * Verify that the predecessor still exists. This step is needed because the ZkClient subscribes for data changes
@@ -146,7 +158,7 @@ public class ZkLeaderElector implements LeaderElector {
   }
 
   // Only by non-leaders
-  class ZkLeaderElectionListener implements IZkDataListener {
+  class PreviousProcessorChangeListener implements IZkDataListener {
 
     @Override
     public void handleDataChange(String dataPath, Object data) throws Exception {

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index d0a269d..b11e02f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.zk;
 
+import org.I0Itec.zkclient.IZkChildListener;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -58,6 +59,7 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
+  private final String processorId = "TO BE PASSED IN THE CONSTRUCTOR"; //TODO
 
   public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
     this.keyBuilder = zkKeyBuilder;
@@ -143,4 +145,51 @@ public class ZkUtils {
   public void close() throws ZkInterruptedException {
     zkClient.close();
   }
+
+  /**
+    * subscribe for changes of JobModel version
+    * @param dataListener describe this
+    */
+  public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
+    LOG.info("pid=" + processorId + " subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
+    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+  }
+
+  /**
+   * read the jobmodel version from ZK
+   * @return jobmodel version as a string
+   */
+  public String getJobModelVersion() {
+    return zkClient.<String>readData(keyBuilder.getJobModelVersionPath());
+  }
+
+  /**
+   * verify that given paths exist in ZK
+   * @param paths
+   */
+  public void makeSurePersistentPathsExists(String[] paths) {
+    for (String path : paths) {
+      if (!zkClient.exists(path)) {
+        zkClient.createPersistent(path, true);
+      }
+    }
+  }
+
+  /**
+   * subscribe to the changes in the list of processors in ZK
+   * @param listener
+   */
+  public void subscribeToProcessorChange(IZkChildListener listener) {
+    LOG.info("pid=" + processorId + " subscribing for child change at:" + keyBuilder.getProcessorsPath());
+    zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+  }
+
+  public void deleteRoot() {
+    String rootPath = keyBuilder.getRootPath();
+    if (rootPath != null && !rootPath.isEmpty() && zkClient.exists(rootPath)) {
+      LOG.info("pid=" + processorId + " Deleteing root: " + rootPath);
+      zkClient.deleteRecursive(rootPath);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index e04f7c9..8e048b2 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -45,8 +45,8 @@ public class TestZkKeyBuilder {
   @Test
   public void testParseIdFromPath() {
     Assert.assertEquals(
-        ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1",
-        ZkKeyBuilder.parseIdFromPath("/test/processors/" + ZkKeyBuilder.PROCESSOR_ID_PREFIX + "1"));
+        "1",
+        ZkKeyBuilder.parseIdFromPath("/test/processors/" + "1"));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(null));
     Assert.assertNull(ZkKeyBuilder.parseIdFromPath(""));
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index b999ec5..6342fde 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -70,9 +70,13 @@ public class TestZkLeaderElector {
     }
   }
 
+  public static class BooleanResult {
+    public boolean res = false;
+  }
 
   @After
   public void testTeardown() {
+    testZkUtils.deleteRoot();
     testZkUtils.close();
   }
 
@@ -94,8 +98,17 @@ public class TestZkLeaderElector {
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils);
-    Assert.assertTrue(leaderElector.tryBecomeLeader());
+    BooleanResult isLeader = new BooleanResult();
+    ZkLeaderElector leaderElector = new ZkLeaderElector("1", mockZkUtils,
+      new ZkLeaderElector.ZkLeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+          isLeader.res = true;
+        }
+      }
+    );
+    leaderElector.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader.res, 2, 100));
   }
 
   @Test
@@ -104,7 +117,13 @@ public class TestZkLeaderElector {
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
 
-    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils);
+    ZkLeaderElector leaderElector = new ZkLeaderElector(processorId, mockZkUtils,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+          }
+        }
+    );
     try {
       leaderElector.tryBecomeLeader();
       Assert.fail("Was expecting leader election to fail!");
@@ -118,29 +137,50 @@ public class TestZkLeaderElector {
    */
   @Test
   public void testLeaderElection() {
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector1 = new ZkLeaderElector(
-        "1",
-        zkUtils1);
+    ZkLeaderElector leaderElector1 = new ZkLeaderElector("1", zkUtils1,
+      new ZkLeaderElector.ZkLeaderElectorListener() {
+        @Override
+        public void onBecomingLeader() {
+          isLeader1.res = true;
+        }
+      }
+    );
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector2 = new ZkLeaderElector(
-        "2",
-        zkUtils2);
+    ZkLeaderElector leaderElector2 = new ZkLeaderElector("2", zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        }
+    );
 
     // Processor-3
     ZkUtils zkUtils3  = getZkUtilsWithNewClient();
-    ZkLeaderElector leaderElector3 = new ZkLeaderElector(
-        "3",
-        zkUtils3);
+    ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        });
 
     Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
 
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
 
@@ -166,16 +206,26 @@ public class TestZkLeaderElector {
     final CountDownLatch electionLatch = new CountDownLatch(1);
     final AtomicInteger count = new AtomicInteger(0);
 
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
+
+
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId("processor1");
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
         zkUtils1,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
-
           }
 
           @Override
@@ -191,6 +241,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
         zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -223,6 +279,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector3 = new ZkLeaderElector(
         "3",
         zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -236,9 +298,12 @@ public class TestZkLeaderElector {
         });
 
     // Join Leader Election
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     Assert.assertTrue(leaderElector1.amILeader());
     Assert.assertFalse(leaderElector2.amILeader());
@@ -278,12 +343,22 @@ public class TestZkLeaderElector {
     final CountDownLatch electionLatch = new CountDownLatch(1);
     final AtomicInteger count = new AtomicInteger(0);
 
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
+    BooleanResult isLeader3 = new BooleanResult();
+
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient();
     zkUtils1.registerProcessorAndGetId("processor1");
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
         zkUtils1,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -302,6 +377,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
         zkUtils2,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -320,6 +401,12 @@ public class TestZkLeaderElector {
     ZkLeaderElector leaderElector3 = new ZkLeaderElector(
         "3",
         zkUtils3,
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader3.res = true;
+          }
+        },
         new IZkDataListener() {
           @Override
           public void handleDataChange(String dataPath, Object data) throws Exception {
@@ -347,9 +434,12 @@ public class TestZkLeaderElector {
         });
 
     // Join Leader Election
-    Assert.assertTrue(leaderElector1.tryBecomeLeader());
-    Assert.assertFalse(leaderElector2.tryBecomeLeader());
-    Assert.assertFalse(leaderElector3.tryBecomeLeader());
+    leaderElector1.tryBecomeLeader();
+    leaderElector2.tryBecomeLeader();
+    leaderElector3.tryBecomeLeader();
+    Assert.assertTrue(TestZkUtils.testWithDelayBackOff(()->isLeader1.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader2.res, 2, 100));
+    Assert.assertFalse(TestZkUtils.testWithDelayBackOff(()->isLeader3.res, 2, 100));
 
     List<String> currentActiveProcessors = testZkUtils.getSortedActiveProcessors();
     Assert.assertEquals(3, currentActiveProcessors.size());
@@ -373,15 +463,29 @@ public class TestZkLeaderElector {
 
   @Test
   public void testAmILeader() {
+    BooleanResult isLeader1 = new BooleanResult();
+    BooleanResult isLeader2 = new BooleanResult();
     // Processor-1
     ZkLeaderElector leaderElector1 = new ZkLeaderElector(
         "1",
-        getZkUtilsWithNewClient());
+        getZkUtilsWithNewClient(),
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader1.res = true;
+          }
+        });
 
     // Processor-2
     ZkLeaderElector leaderElector2 = new ZkLeaderElector(
         "2",
-        getZkUtilsWithNewClient());
+        getZkUtilsWithNewClient(),
+        new ZkLeaderElector.ZkLeaderElectorListener() {
+          @Override
+          public void onBecomingLeader() {
+            isLeader2.res = true;
+          }
+        });
 
     // Before Leader Election
     Assert.assertFalse(leaderElector1.amILeader());

http://git-wip-us.apache.org/repos/asf/samza/blob/f1bc1d0b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 855d29d..b719e28 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,8 @@
  */
 package org.apache.samza.zk;
 
+import java.util.function.BooleanSupplier;
+import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
@@ -32,10 +34,10 @@ import org.junit.Test;
 public class TestZkUtils {
   private static EmbeddedZookeeper zkServer = null;
   private static final ZkKeyBuilder KEY_BUILDER = new ZkKeyBuilder("test");
-  private ZkConnection zkConnection = null;
   private ZkClient zkClient = null;
   private static final int SESSION_TIMEOUT_MS = 20000;
   private static final int CONNECTION_TIMEOUT_MS = 10000;
+  private ZkUtils zkUtils;
 
   @BeforeClass
   public static void setup() throws InterruptedException {
@@ -57,11 +59,21 @@ public class TestZkUtils {
     } catch (ZkNodeExistsException e) {
       // Do nothing
     }
+
+
+    zkUtils = new ZkUtils(
+        KEY_BUILDER,
+        zkClient,
+        SESSION_TIMEOUT_MS);
+
+    zkUtils.connect();
+
   }
 
 
   @After
   public void testTeardown() {
+    zkUtils.close();
     zkClient.close();
   }
 
@@ -72,34 +84,91 @@ public class TestZkUtils {
 
   @Test
   public void testRegisterProcessorId() {
-    ZkUtils utils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS);
-    utils.connect();
-    String assignedPath = utils.registerProcessorAndGetId("0.0.0.0");
+    String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0");
     Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
 
     // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
-    Assert.assertTrue(utils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+    Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
 
-    utils.close();
   }
 
   @Test
   public void testGetActiveProcessors() {
-    ZkUtils utils = new ZkUtils(
-        KEY_BUILDER,
-        zkClient,
-        SESSION_TIMEOUT_MS);
-    utils.connect();
+    Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
+    zkUtils.registerProcessorAndGetId("processorData");
 
-    Assert.assertEquals(0, utils.getSortedActiveProcessors().size());
-    utils.registerProcessorAndGetId("processorData");
+    Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
 
-    Assert.assertEquals(1, utils.getSortedActiveProcessors().size());
+  }
 
-    utils.close();
+  @Test
+  public void testSubscribeToJobModelVersionChange() {
+
+    ZkKeyBuilder keyBuilder = new ZkKeyBuilder("test");
+    String root = keyBuilder.getRootPath();
+    zkClient.deleteRecursive(root);
+
+    class Result {
+      String res = "";
+      public String getRes() {
+        return res;
+      }
+      public void updateRes(String newRes) {
+        res = newRes;
+      }
+    }
+
+    Assert.assertFalse(zkUtils.exists(root));
+
+    // create the paths
+    zkUtils.makeSurePersistentPathsExists(
+        new String[]{root, keyBuilder.getJobModelVersionPath(), keyBuilder.getProcessorsPath()});
+    Assert.assertTrue(zkUtils.exists(root));
+    Assert.assertTrue(zkUtils.exists(keyBuilder.getJobModelVersionPath()));
+    Assert.assertTrue(zkUtils.exists(keyBuilder.getProcessorsPath()));
+
+    final Result res = new Result();
+    // define the callback
+    IZkDataListener dataListener = new IZkDataListener() {
+      @Override
+      public void handleDataChange(String dataPath, Object data)
+          throws Exception {
+        res.updateRes((String) data);
+      }
+
+      @Override
+      public void handleDataDeleted(String dataPath)
+          throws Exception {
+        Assert.fail("Data wasn't deleted;");
+      }
+    };
+    // subscribe
+    zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+    zkClient.subscribeDataChanges(keyBuilder.getProcessorsPath(), dataListener);
+    // update
+    zkClient.writeData(keyBuilder.getJobModelVersionPath(), "newVersion");
+
+    // verify
+    Assert.assertTrue(testWithDelayBackOff(() -> "newVersion".equals(res.getRes()), 2, 1000));
+
+    // update again
+    zkClient.writeData(keyBuilder.getProcessorsPath(), "newProcessor");
+
+    Assert.assertTrue(testWithDelayBackOff(() -> "newProcessor".equals(res.getRes()), 2, 1000));
   }
 
+  public static boolean testWithDelayBackOff(BooleanSupplier cond, long startDelayMs, long maxDelayMs) {
+    long delay = startDelayMs;
+    while (delay < maxDelayMs) {
+      if (cond.getAsBoolean())
+        return true;
+      try {
+        Thread.sleep(delay);
+      } catch (InterruptedException e) {
+        return false;
+      }
+      delay *= 2;
+    }
+    return false;
+  }
 }