You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/04/27 21:37:18 UTC

samza git commit: SAMZA-1210: Fixing merge issue and container id generations.

Repository: samza
Updated Branches:
  refs/heads/master a2220ed23 -> 05060edfe


SAMZA-1210: Fixing merge issue and container id generations.

This PR is just for fixing issues introduced by a merge and changing container id generation.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #122 from sborya/mergeFixes


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/05060edf
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/05060edf
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/05060edf

Branch: refs/heads/master
Commit: 05060edfe18a47075dfe80059b0e83102063f587
Parents: a2220ed
Author: Boris Shkolnik <bo...@apache.org>
Authored: Thu Apr 27 14:37:08 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Thu Apr 27 14:37:08 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/config/ApplicationConfig.java  |  8 +++
 .../samza/runtime/LocalApplicationRunner.java   |  3 +-
 .../java/org/apache/samza/zk/ProcessorData.java | 54 +++++++++++++++++
 .../org/apache/samza/zk/ZkControllerImpl.java   | 14 +++--
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 61 +++++++++++--------
 .../samza/zk/ZkJobCoordinatorFactory.java       | 17 +-----
 .../java/org/apache/samza/zk/ZkKeyBuilder.java  | 16 +++--
 .../org/apache/samza/zk/ZkLeaderElector.java    |  4 +-
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 62 ++++++++++++++++----
 .../org/apache/samza/zk/TestZkKeyBuilder.java   |  9 +--
 .../apache/samza/zk/TestZkLeaderElector.java    | 30 +++++-----
 .../java/org/apache/samza/zk/TestZkUtils.java   | 30 ++++++++--
 12 files changed, 218 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
index 9eb4161..ea0f999 100644
--- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java
@@ -72,6 +72,14 @@ public class ApplicationConfig extends MapConfig {
     return get(APP_CLASS, null);
   }
 
+  /**
+   * returns full application id
+   * @return full app id
+   */
+  public String getGlobalAppId() {
+    return String.format("app-%s-%s", getAppName(), getAppId());
+  }
+
   @Deprecated
   public String getProcessorId() {
     return get(PROCESSOR_ID, null);

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index adc76d5..5e83c3c 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -161,8 +161,7 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
     String clazz = appConfig.getCoordinationServiceFactoryClass();
     if (clazz != null) {
       CoordinationServiceFactory factory = ClassLoaderHelper.fromClassName(clazz);
-      String groupId = String.format("app-%s-%s", appConfig.getAppName(), appConfig.getAppId());
-      return factory.getCoordinationService(groupId, uid, config);
+      return factory.getCoordinationService(appConfig.getGlobalAppId(), uid, config);
     } else {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
new file mode 100644
index 0000000..3f4fd0b
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ProcessorData.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+import org.apache.samza.SamzaException;
+
+
+public class ProcessorData {
+  private final String processorId;
+  private final String host;
+
+  public ProcessorData(String host, String processorId) {
+    this.processorId = processorId;
+    this.host = host;
+  }
+
+  public ProcessorData(String data) {
+    String[] splt = data.split(" ");
+    if (splt.length != 2) {
+      throw new SamzaException("incorrect processor data format = " + data);
+    }
+    host = splt[0];
+    processorId = splt[1];
+  }
+
+  public String toString() {
+    return host + " " + processorId;
+  }
+
+  public String getHost() {
+    return host;
+  }
+
+  public String getProcessorId() {
+    return processorId;
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
index 4570a62..61f7876 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkControllerImpl.java
@@ -52,8 +52,10 @@ public class ZkControllerImpl implements ZkController {
   private void init() {
     ZkKeyBuilder keyBuilder = zkUtils.getKeyBuilder();
     zkUtils.makeSurePersistentPathsExists(
-        new String[]{keyBuilder.getProcessorsPath(), keyBuilder.getJobModelVersionPath(), keyBuilder
-            .getJobModelPathPrefix()});
+        new String[]{
+            keyBuilder.getProcessorsPath(),
+            keyBuilder.getJobModelVersionPath(),
+            keyBuilder.getJobModelPathPrefix()});
   }
 
   private void onBecomeLeader() {
@@ -114,16 +116,16 @@ public class ZkControllerImpl implements ZkController {
      * Called when the children of the given path changed.
      *
      * @param parentPath    The parent path
-     * @param currentChilds The children or null if the root node (parent path) was deleted.
+     * @param currentChildren The children or null if the root node (parent path) was deleted.
      * @throws Exception
      */
     @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+    public void handleChildChange(String parentPath, List<String> currentChildren) throws Exception {
       LOG.info(
           "ZkControllerImpl::ZkProcessorChangeHandler::handleChildChange - Path: " + parentPath + "  Current Children: "
-              + currentChilds);
+              + currentChildren);
       debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
-          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChilds));
+          ScheduleAfterDebounceTime.DEBOUNCE_TIME_MS, () -> zkControllerListener.onProcessorChange(currentChildren));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index 884c471..2535654 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -25,9 +25,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.JavaSystemConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.BarrierForVersionUpgrade;
 import org.apache.samza.coordinator.CoordinationServiceFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.JobCoordinator;
@@ -47,7 +49,7 @@ import org.slf4j.LoggerFactory;
  */
 public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private static final Logger log = LoggerFactory.getLogger(ZkJobCoordinator.class);
-  private static final String JOB_MODEL_VERSION_BARRIER = "JobModelVersion";
+  private static final String JOB_MODEL_UPGRADE_BARRIER = "jobModelUpgradeBarrier";
 
   private final ZkUtils zkUtils;
   private final String processorId;
@@ -61,23 +63,24 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   private final CoordinationUtils coordinationUtils;
 
   private JobModel newJobModel;
-  private String newJobModelVersion;  // version published in ZK (by the leader)
   private JobModel jobModel;
 
-  public ZkJobCoordinator(String processorId, String groupId, Config config, ScheduleAfterDebounceTime debounceTimer, ZkUtils zkUtils,
+  public ZkJobCoordinator(String processorId, Config config, ScheduleAfterDebounceTime debounceTimer,
                           SamzaContainerController containerController) {
-    this.processorId = processorId;
-    this.zkUtils = zkUtils;
-    this.keyBuilder = zkUtils.getKeyBuilder();
     this.debounceTimer = debounceTimer;
     this.containerController = containerController;
-    this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
     this.config = config;
+    this.processorId = processorId;
+
     this.coordinationUtils = Util.
         <CoordinationServiceFactory>getObj(
             new JobCoordinatorConfig(config)
                 .getJobCoordinationServiceFactoryClassName())
-        .getCoordinationService(groupId, String.valueOf(processorId), config);
+        .getCoordinationService(new ApplicationConfig(config).getGlobalAppId(), String.valueOf(processorId), config);
+
+    this.zkUtils = ((ZkCoordinationUtils) coordinationUtils).getZkUtils();
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    this.zkController = new ZkControllerImpl(processorId, zkUtils, debounceTimer, this);
 
     streamMetadataCache = getStreamMetadataCache();
   }
@@ -141,28 +144,27 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   }
 
   @Override
-  public void onProcessorChange(List<String> processorIds) {
-    log.info("ZkJobCoordinator::onProcessorChange - Processors changed! List: " + Arrays.toString(processorIds.toArray()));
-    generateNewJobModel();
+  public void onProcessorChange(List<String> processors) {
+    log.info("ZkJobCoordinator::onProcessorChange - list of processors changed! List size=" + processors.size());
+    // if list of processors is empty - it means we are called from 'onBecomeLeader'
+    generateNewJobModel(processors);
   }
 
   @Override
   public void onNewJobModelAvailable(final String version) {
-    newJobModelVersion = version;
     log.info("pid=" + processorId + "new JobModel available");
     // stop current work
     containerController.stopContainer();
     log.info("pid=" + processorId + "new JobModel available.Container stopped.");
     // get the new job model
     newJobModel = zkUtils.getJobModel(version);
-    log.info("pid=" + processorId + "new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
-    String currentPath = zkUtils.getEphemeralPath();
-    String zkProcessorId = keyBuilder.parseIdFromPath(currentPath);
+    log.info("pid=" + processorId + ": new JobModel available. ver=" + version + "; jm = " + newJobModel);
 
     // update ZK and wait for all the processors to get this new version
-    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
-    barrier.waitForBarrier(version, String.valueOf(zkProcessorId), new Runnable() {
+    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(
+        JOB_MODEL_UPGRADE_BARRIER);
+    barrier.waitForBarrier(version, processorId, new Runnable() {
       @Override
       public void run() {
         onNewJobModelConfirmed(version);
@@ -185,9 +187,16 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   /**
    * Generate new JobModel when becoming a leader or the list of processor changed.
    */
-  private void generateNewJobModel() {
-    // get the current list of processors
-    List<String> currentProcessors = zkUtils.getSortedActiveProcessors();
+  private void generateNewJobModel(List<String> processors) {
+    List<String> currentProcessorsIds;
+    if (processors.size() > 0) {
+      // we should use this list
+      // but it needs to be converted into PIDs, which is part of the data
+      currentProcessorsIds = zkUtils.getActiveProcessorsIDs(processors);
+    } else {
+      // get the current list of processors
+      currentProcessorsIds = zkUtils.getSortedActiveProcessorsIDs();
+    }
 
     // get the current version
     String currentJMVersion  = zkUtils.getJobModelVersion();
@@ -200,10 +209,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     }
     log.info("pid=" + processorId + "generating new model. Version = " + nextJMVersion);
 
-    List<String> containerIds = new ArrayList<>();
-    for (String processor : currentProcessors) {
-      String zkProcessorId = ZkKeyBuilder.parseIdFromPath(processor);
-      containerIds.add(zkProcessorId);
+    List<String> containerIds = new ArrayList<>(currentProcessorsIds.size());
+    for (String processorPid : currentProcessorsIds) {
+      containerIds.add(processorPid);
     }
     log.info("generate new job model: processorsIds: " + Arrays.toString(containerIds.toArray()));
 
@@ -217,8 +225,9 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     log.info("pid=" + processorId + "published new JobModel ver=" + nextJMVersion + ";jm=" + jobModel);
 
     // start the barrier for the job model update
-    ZkBarrierForVersionUpgrade barrier = (ZkBarrierForVersionUpgrade) coordinationUtils.getBarrier(JOB_MODEL_VERSION_BARRIER);
-    barrier.start(nextJMVersion, currentProcessors);
+    BarrierForVersionUpgrade barrier = coordinationUtils.getBarrier(
+        JOB_MODEL_UPGRADE_BARRIER);
+    barrier.start(nextJMVersion, currentProcessorsIds);
 
     // publish new JobModel version
     zkUtils.publishJobModelVersion(currentJMVersion, nextJMVersion);

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index fb63236..a44565c 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -19,10 +19,7 @@
 
 package org.apache.samza.zk;
 
-import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.Config;
-import org.apache.samza.config.JobConfig;
-import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.processor.SamzaContainerController;
@@ -31,27 +28,19 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
    * Method to instantiate an implementation of JobCoordinator
    *
-   * @param config  Configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+   * @param processorId - id of this processor
+   * @param config - configs relevant for the JobCoordinator TODO: Separate JC related configs into a "JobCoordinatorConfig"
+   * @param containerController - controller to allow JobCoordinator control the SamzaContainer.
    * @return An instance of IJobCoordinator
    */
   @Override
   public JobCoordinator getJobCoordinator(String processorId, Config config, SamzaContainerController containerController) {
-    JobConfig jobConfig = new JobConfig(config);
-    String groupName = String.format("%s-%s", jobConfig.getName().get(), jobConfig.getJobId().get());
-    ZkConfig zkConfig = new ZkConfig(config);
     ScheduleAfterDebounceTime debounceTimer = new ScheduleAfterDebounceTime();
-    ZkClient zkClient = new ZkClient(zkConfig.getZkConnect(), zkConfig.getZkSessionTimeoutMs(), zkConfig.getZkConnectionTimeoutMs());
 
     return new ZkJobCoordinator(
         processorId,
-        groupName,
         config,
         debounceTimer,
-        new ZkUtils(
-            new ZkKeyBuilder(groupName),
-            zkClient,
-            zkConfig.getZkConnectionTimeoutMs()
-            ),
         containerController);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
index 44f83e4..7452a97 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkKeyBuilder.java
@@ -26,7 +26,13 @@ import com.google.common.base.Strings;
  * The following ZK hierarchy is maintained for Standalone jobs:
  * <pre>
  *   - /
- *      |- jobName-jobId/
+ *      |- groupId/
+ *          |- JobModelGeneration/
+ *              |- jobModelVersion (data contains the version)
+ *              |- jobModelUpgradeBarrier/ (contains barrier related data)
+ *              |- jobModels/
+ *                 |- 1 (contains job model version 1 as data)
+ *                 |- 2
  *          |- processors/
  *              |- 00000001
  *              |- 00000002
@@ -44,7 +50,7 @@ public class ZkKeyBuilder {
   private final String pathPrefix;
 
   static final String PROCESSORS_PATH = "processors";
-  public static final String JOBMODEL_VERSION_PATH = "jobModelVersion";
+  static final String JOBMODEL_GENERATION_PATH = "JobModelGeneration";
 
   public ZkKeyBuilder(String pathPrefix) {
     if (Strings.isNullOrEmpty(pathPrefix)) {
@@ -77,11 +83,11 @@ public class ZkKeyBuilder {
   }
 
   public String getJobModelVersionPath() {
-    return String.format("/%s/%s", pathPrefix, JOBMODEL_VERSION_PATH);
+    return String.format("%s/%s/jobModelVersion", getRootPath(), JOBMODEL_GENERATION_PATH);
   }
 
   public String getJobModelPathPrefix() {
-    return String.format("/%s/jobModels", pathPrefix);
+    return String.format("%s/%s/jobModels", getRootPath(), JOBMODEL_GENERATION_PATH, pathPrefix);
   }
 
   public String getJobModelPath(String jobModelVersion) {
@@ -89,6 +95,6 @@ public class ZkKeyBuilder {
   }
 
   public String getJobModelVersionBarrierPrefix(String barrierId) {
-    return String.format("/%s/%s/versionBarriers", pathPrefix, barrierId);
+    return String.format("%s/%s/%s/versionBarriers", getRootPath(), JOBMODEL_GENERATION_PATH, barrierId);
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
index c2a8901..644864a 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkLeaderElector.java
@@ -83,9 +83,9 @@ public class ZkLeaderElector implements LeaderElector {
 
   @Override
   public void tryBecomeLeader(LeaderElectorListener leaderElectorListener) {
-    String currentPath = zkUtils.registerProcessorAndGetId(hostName + " " + processorIdStr);
+    String currentPath = zkUtils.registerProcessorAndGetId(new ProcessorData(hostName, processorIdStr));
 
-    List<String> children = zkUtils.getSortedActiveProcessors();
+    List<String> children = zkUtils.getSortedActiveProcessorsZnodes();
     LOG.debug(zLog("Current active processors - " + children));
     int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(currentPath));
 

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index d77aab2..fee8405 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -20,6 +20,7 @@
 package org.apache.samza.zk;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -103,12 +104,11 @@ public class ZkUtils {
    * @param data Object that should be written as data in the registered ephemeral ZK node
    * @return String representing the absolute ephemeralPath of this client in the current session
    */
-  public synchronized String registerProcessorAndGetId(final Object data) {
+  public synchronized String registerProcessorAndGetId(final ProcessorData data) {
     if (ephemeralPath == null) {
-      // TODO: Data should be more than just the hostname. Use Json serialized data
       ephemeralPath =
           zkClient.createEphemeralSequential(
-              keyBuilder.getProcessorsPath() + "/", data);
+              keyBuilder.getProcessorsPath() + "/", data.toString());
 
       LOG.info("newly generated path for " + data +  " is " +  ephemeralPath);
       return ephemeralPath;
@@ -123,17 +123,59 @@ public class ZkUtils {
   }
 
   /**
-   * Method is used to get the <i>sorted</i> list of currently active/registered processors
+   * Method is used to get the <i>sorted</i> list of currently active/registered processors (znodes)
    *
    * @return List of absolute ZK node paths
    */
-  public List<String> getSortedActiveProcessors() {
-    List<String> children = zkClient.getChildren(keyBuilder.getProcessorsPath());
-    if (children.size() > 0) {
-      Collections.sort(children);
-      LOG.info("Found these children - " + children);
+  public List<String> getSortedActiveProcessorsZnodes() {
+    List<String> znodeIds = zkClient.getChildren(keyBuilder.getProcessorsPath());
+    if (znodeIds.size() > 0) {
+      Collections.sort(znodeIds);
+      LOG.info("Found these children - " + znodeIds);
     }
-    return children;
+    return znodeIds;
+  }
+
+  /**
+   * Method is used to read processor's data from the znode
+   * @param fullPath absolute path to the znode
+   * @return processor's data
+   */
+  String readProcessorData(String fullPath) {
+    String data = zkClient.<String>readData(fullPath, true);
+    if (data == null) {
+      throw new SamzaException(String.format("Cannot read ZK node:", fullPath));
+    }
+    return data;
+  }
+
+  /**
+   * Method is used to get the list of currently active/registered processor ids
+   * @return List of processorIds
+   */
+  public List<String> getSortedActiveProcessorsIDs() {
+    return getActiveProcessorsIDs(getSortedActiveProcessorsZnodes());
+  }
+
+  /**
+   * Method is used to get the <i>sorted</i> list of processors ids for a given list of znodes
+   * @param znodeIds - list of relative paths of the children's znodes
+   * @return List of processor ids for a given list of znodes
+   */
+  public List<String> getActiveProcessorsIDs(List<String> znodeIds) {
+    String processorPath = keyBuilder.getProcessorsPath();
+    List<String> processorIds = new ArrayList<>(znodeIds.size());
+    if (znodeIds.size() > 0) {
+
+      for (String child : znodeIds) {
+        String fullPath = String.format("%s/%s", processorPath, child);
+        processorIds.add(readProcessorData(fullPath));
+      }
+
+      LOG.info("Found these children - " + znodeIds);
+      LOG.info("Found these processorIds - " + processorIds);
+    }
+    return processorIds;
   }
 
   /* Wrapper for standard I0Itec methods */

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
index ef271f0..3134cfa 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkKeyBuilder.java
@@ -56,10 +56,11 @@ public class TestZkKeyBuilder {
 
     ZkKeyBuilder builder = new ZkKeyBuilder("test");
 
-    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_VERSION_PATH, builder.getJobModelVersionPath());
-    Assert.assertEquals("/test/jobModels", builder.getJobModelPathPrefix());
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModelVersion", builder.getJobModelVersionPath());
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels", builder.getJobModelPathPrefix());
     String version = "2";
-    Assert.assertEquals("/test/jobModels/" + version, builder.getJobModelPath(version));
-    Assert.assertEquals("/test/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix("testBarrier"));
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/jobModels/" + version, builder.getJobModelPath(version));
+    Assert.assertEquals("/test/" + ZkKeyBuilder.JOBMODEL_GENERATION_PATH + "/testBarrier/versionBarriers", builder.getJobModelVersionBarrierPrefix(
+        "testBarrier"));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index c2a5a71..5aaee2a 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -100,7 +100,7 @@ public class TestZkLeaderElector {
     ZkUtils mockZkUtils = mock(ZkUtils.class);
     when(mockZkUtils.registerProcessorAndGetId(any())).
         thenReturn(KEY_BUILDER.getProcessorsPath() + "/0000000000");
-    when(mockZkUtils.getSortedActiveProcessors()).thenReturn(activeProcessors);
+    when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(activeProcessors);
     Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
 
     ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
@@ -123,7 +123,7 @@ public class TestZkLeaderElector {
   public void testUnregisteredProcessorInLeaderElection() {
     String processorId = "1";
     ZkUtils mockZkUtils = mock(ZkUtils.class);
-    when(mockZkUtils.getSortedActiveProcessors()).thenReturn(new ArrayList<String>());
+    when(mockZkUtils.getSortedActiveProcessorsZnodes()).thenReturn(new ArrayList<String>());
     Mockito.doNothing().when(mockZkUtils).makeSurePersistentPathsExists(any(String[].class));
 
     ZkKeyBuilder kb = mock(ZkKeyBuilder.class);
@@ -167,7 +167,7 @@ public class TestZkLeaderElector {
     ZkUtils zkUtils3 = getZkUtilsWithNewClient("3");
     ZkLeaderElector leaderElector3 = new ZkLeaderElector("3", zkUtils3, null);
 
-    Assert.assertEquals(0, testZkUtils.getSortedActiveProcessors().size());
+    Assert.assertEquals(0, testZkUtils.getSortedActiveProcessorsZnodes().size());
 
     leaderElector1.tryBecomeLeader(new LeaderElectorListener() {
       @Override
@@ -192,14 +192,14 @@ public class TestZkLeaderElector {
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
 
-    Assert.assertEquals(3, testZkUtils.getSortedActiveProcessors().size());
+    Assert.assertEquals(3, testZkUtils.getSortedActiveProcessorsZnodes().size());
 
     // Clean up
     zkUtils1.close();
     zkUtils2.close();
     zkUtils3.close();
 
-    Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessors());
+    Assert.assertEquals(new ArrayList<String>(), testZkUtils.getSortedActiveProcessorsZnodes());
 
   }
 
@@ -223,7 +223,7 @@ public class TestZkLeaderElector {
 
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
-    zkUtils1.registerProcessorAndGetId("processor1");
+    zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
     ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
 
     leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -241,7 +241,7 @@ public class TestZkLeaderElector {
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
-    final String path2 = zkUtils2.registerProcessorAndGetId("processor2");
+    final String path2 = zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
     ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
 
     leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -273,7 +273,7 @@ public class TestZkLeaderElector {
 
     // Processor-3
     ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
-    zkUtils3.registerProcessorAndGetId("processor3");
+    zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
     ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
 
     leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -317,7 +317,7 @@ public class TestZkLeaderElector {
     Assert.assertFalse(leaderElector2.amILeader());
     Assert.assertFalse(leaderElector3.amILeader());
 
-    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
+    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     // Leader Failure
@@ -331,7 +331,7 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, zkUtils2.getSortedActiveProcessorsZnodes());
 
     // Clean up
     zkUtils2.close();
@@ -358,7 +358,7 @@ public class TestZkLeaderElector {
 
     // Processor-1
     ZkUtils zkUtils1 = getZkUtilsWithNewClient("processor1");
-    zkUtils1.registerProcessorAndGetId("processor1");
+    zkUtils1.registerProcessorAndGetId(new ProcessorData("processor1", "1"));
     ZkLeaderElector leaderElector1 = new ZkLeaderElector("processor1", zkUtils1, null);
 
     leaderElector1.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -378,7 +378,7 @@ public class TestZkLeaderElector {
 
     // Processor-2
     ZkUtils zkUtils2 = getZkUtilsWithNewClient("processor2");
-    zkUtils2.registerProcessorAndGetId("processor2");
+    zkUtils2.registerProcessorAndGetId(new ProcessorData("processor2", "2"));
     ZkLeaderElector leaderElector2 = new ZkLeaderElector("processor2", zkUtils2, null);
 
     leaderElector2.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -397,7 +397,7 @@ public class TestZkLeaderElector {
 
     // Processor-3
     ZkUtils zkUtils3  = getZkUtilsWithNewClient("processor3");
-    final String path3 = zkUtils3.registerProcessorAndGetId("processor3");
+    final String path3 = zkUtils3.registerProcessorAndGetId(new ProcessorData("processor3", "3"));
     ZkLeaderElector leaderElector3 = new ZkLeaderElector("processor3", zkUtils3, null);
 
     leaderElector3.setPreviousProcessorChangeListener(new IZkDataListener() {
@@ -451,7 +451,7 @@ public class TestZkLeaderElector {
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader2.res, 2, 100));
     Assert.assertFalse(TestZkUtils.testWithDelayBackOff(() -> isLeader3.res, 2, 100));
 
-    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessors();
+    List<String> currentActiveProcessors = zkUtils1.getSortedActiveProcessorsZnodes();
     Assert.assertEquals(3, currentActiveProcessors.size());
 
     zkUtils2.close();
@@ -464,7 +464,7 @@ public class TestZkLeaderElector {
     }
 
     Assert.assertEquals(1, count.get());
-    Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessors());
+    Assert.assertEquals(currentActiveProcessors, zkUtils1.getSortedActiveProcessorsZnodes());
 
     // Clean up
     zkUtils1.close();

http://git-wip-us.apache.org/repos/asf/samza/blob/05060edf/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index 010fc19..b8dc295 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.samza.zk;
 
+import java.util.List;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.ZkConnection;
@@ -73,7 +74,6 @@ public class TestZkUtils {
         SESSION_TIMEOUT_MS);
 
     zkUtils.connect();
-
   }
 
   @After
@@ -89,19 +89,37 @@ public class TestZkUtils {
 
   @Test
   public void testRegisterProcessorId() {
-    String assignedPath = zkUtils.registerProcessorAndGetId("0.0.0.0");
+    String assignedPath = zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1"));
     Assert.assertTrue(assignedPath.startsWith(KEY_BUILDER.getProcessorsPath()));
 
     // Calling registerProcessorId again should return the same ephemeralPath as long as the session is valid
-    Assert.assertTrue(zkUtils.registerProcessorAndGetId("0.0.0.0").equals(assignedPath));
+    Assert.assertTrue(zkUtils.registerProcessorAndGetId(new ProcessorData("host", "1")).equals(assignedPath));
 
   }
 
   @Test
   public void testGetActiveProcessors() {
-    Assert.assertEquals(0, zkUtils.getSortedActiveProcessors().size());
-    zkUtils.registerProcessorAndGetId("processorData");
-    Assert.assertEquals(1, zkUtils.getSortedActiveProcessors().size());
+    Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsZnodes().size());
+    zkUtils.registerProcessorAndGetId(new ProcessorData("processorData", "1"));
+    Assert.assertEquals(1, zkUtils.getSortedActiveProcessorsZnodes().size());
+  }
+
+  @Test
+  public void testGetProcessorsIDs() {
+    Assert.assertEquals(0, zkUtils.getSortedActiveProcessorsIDs().size());
+    zkUtils.registerProcessorAndGetId(new ProcessorData("host1", "1"));
+    List<String> l = zkUtils.getSortedActiveProcessorsIDs();
+    Assert.assertEquals(1, l.size());
+    new ZkUtils(KEY_BUILDER, zkClient, SESSION_TIMEOUT_MS).registerProcessorAndGetId(new ProcessorData("host2", "2"));
+    l = zkUtils.getSortedActiveProcessorsIDs();
+    Assert.assertEquals(2, l.size());
+
+    ProcessorData pd = new ProcessorData(l.get(0));
+    Assert.assertEquals(" ID1 didn't match", "1", pd.getProcessorId());
+    Assert.assertEquals(" Host1 didn't match", "host1", pd.getHost());
+    pd = new ProcessorData(l.get(1));
+    Assert.assertEquals(" ID2 didn't match", "2", pd.getProcessorId());
+    Assert.assertEquals(" Host2 didn't match", "host2", pd.getHost());
   }
   
   @Test