You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2019/05/22 21:09:16 UTC

[samza] branch master updated: SAMZA-1531: Support run.id in standalone for batch processing.

This is an automated email from the ASF dual-hosted git repository.

jagadish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d5f446  SAMZA-1531: Support run.id in standalone for batch processing.
6d5f446 is described below

commit 6d5f4461d0fa2485ea1274d051f57c5983df5f2e
Author: Manasa <mg...@linkedin.com>
AuthorDate: Wed May 22 14:09:08 2019 -0700

    SAMZA-1531: Support run.id in standalone for batch processing.
    
    Adds run.id in standalone: only for BATCH mode and no changes for STREAM mode.
    
    Known issues:
    
    - No support for Azure in this PR. a follow up PR will be made for it. However, this PR does not break Azure/Passthrough and resorts to current behavior for Azure/Passthrough.
    
    - upon an unclean shut down of the job, a restart before ZK server session timeout would result in reuse of the old runid.
    
    Author: Manasa <mg...@linkedin.com>
    
    Reviewers: Jagadish <ja...@apache.org>, Bharath <bk...@linkedin.com>
    
    Closes #938 from lakshmi-manasa-g/standaloneRunid
---
 .../samza/coordinator/AzureCoordinationUtils.java  |  19 +-
 .../org/apache/samza/coordinator/AzureLock.java    |  11 +-
 .../samza/coordinator/ClusterMembership.java       |  60 ++++
 ...ckWithState.java => CoordinationConstants.java} |  26 +-
 .../samza/coordinator/CoordinationUtils.java       |   9 +-
 .../apache/samza/coordinator/DistributedLock.java  |  11 +-
 .../apache/samza/coordinator/RunIdGenerator.java   | 110 +++++++
 .../apache/samza/execution/LocalJobPlanner.java    | 126 ++++++--
 .../samza/runtime/LocalApplicationRunner.java      |  93 +++++-
 .../org/apache/samza/zk/ZkClusterMembership.java   |  71 +++++
 .../org/apache/samza/zk/ZkCoordinationUtils.java   |  10 +-
 .../samza/zk/ZkCoordinationUtilsFactory.java       |   4 +-
 .../org/apache/samza/zk/ZkDistributedLock.java     |  94 +++---
 .../apache/samza/zk/ZkMetadataStoreFactory.java    |   6 +-
 .../samza/coordinator/TestRunIdGenerator.java      |  93 ++++++
 .../samza/execution/TestLocalJobPlanner.java       |  34 ++-
 .../samza/runtime/TestLocalApplicationRunner.java  | 122 +++++++-
 .../apache/samza/zk/TestZkClusterMembership.java   | 133 +++++++++
 .../org/apache/samza/zk/TestZkDistributedLock.java | 126 ++++++++
 .../org/apache/samza/zk/TestZkMetadataStore.java   |   2 +-
 .../processor/TestZkLocalApplicationRunner.java    | 321 +++++++++++++++++++--
 21 files changed, 1323 insertions(+), 158 deletions(-)

diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index f50ab72..eaac8e5 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -22,7 +22,6 @@ package org.apache.samza.coordinator;
 import org.apache.samza.AzureClient;
 import org.apache.samza.config.AzureConfig;
 import org.apache.samza.config.Config;
-import org.apache.samza.util.BlobUtils;
 
 
 public class AzureCoordinationUtils implements CoordinationUtils {
@@ -45,11 +44,21 @@ public class AzureCoordinationUtils implements CoordinationUtils {
     return null;
   }
 
+  /**
+   * To support DistributedLock in Azure, even {@link org.apache.samza.metadatastore.MetadataStore} needs to be implemented.
+   * Because, both of these are used in {@link org.apache.samza.execution.LocalJobPlanner} for intermediate stream creation.
+   * Currently MetadataStore defaults to ZkMetataStore in LocalJobPlanner due to `metadata.store.factory` not being exposed
+   * So in order to avoid using AzureLock coupled with ZkMetadataStore, DistributedLock is not supported for Azure
+   * See SAMZA-2180 for more details.
+   */
   @Override
-  public DistributedLockWithState getLockWithState(String lockId) {
-    BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
-        azureConfig.getAzureBlobName() + lockId, azureConfig.getAzureBlobLength());
-    return new AzureLock(blob);
+  public DistributedLock getLock(String lockId) {
+    throw new UnsupportedOperationException("DistributedLock not supported in Azure!");
+  }
+
+  @Override
+  public ClusterMembership getClusterMembership() {
+    throw new UnsupportedOperationException("ClusterMembership not supported in Azure!");
   }
 
   @Override
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index 8cddc4c..22cdf2e 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -19,8 +19,8 @@
 
 package org.apache.samza.coordinator;
 
+import java.time.Duration;
 import java.util.Random;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.samza.AzureException;
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Distributed lock primitive for Azure.
  */
-public class AzureLock implements DistributedLockWithState {
+public class AzureLock implements DistributedLock {
 
   private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
   private static final int LEASE_TIME_IN_SEC = 60;
@@ -51,14 +51,13 @@ public class AzureLock implements DistributedLockWithState {
    * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
    * The lock is acquired when the blob is leased successfully.
    * @param timeout Duration after which timeout occurs.
-   * @param unit Time Unit of the timeout defined above.
    * @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
    */
   @Override
-  public boolean lockIfNotSet(long timeout, TimeUnit unit) {
+  public boolean lock(Duration timeout) {
     //Start timer for timeout
     long startTime = System.currentTimeMillis();
-    long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+    long lockTimeout = timeout.toMillis();
     Random random = new Random();
 
     while ((System.currentTimeMillis() - startTime) < lockTimeout) {
@@ -87,7 +86,7 @@ public class AzureLock implements DistributedLockWithState {
    * Unlocks, by releasing the lease on the blob.
    */
   @Override
-  public void unlockAndSet() {
+  public void unlock() {
     boolean status = leaseBlobManager.releaseLease(leaseId.get());
     if (status) {
       LOG.info("Unlocked successfully.");
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
new file mode 100644
index 0000000..45b6fd3
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/ClusterMembership.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import org.apache.samza.annotation.InterfaceStability;
+
+/**
+ * Coordination Primitive to maintain the list of processors in the quorum
+ *
+ * Guarantees:
+ * 1. operations are linearizable
+ * 2. registration persistence in the absence of connection errors
+ *
+ * Non-guarantees:
+ * 1. thread safe
+ * 2. concurrent access of the list of processors in the quorum
+ * 3. persistence of registration across connection errors
+ * 4. processorId as indicator of registration order
+ *
+ *  Implementor responsibilities:
+ * 1. registerProcessor returns a unique processorId
+ * 2. getNumberOfProcessors by a processor should reflect at least its own registration status
+ * 3. unregisterProcessor for a null or unregistered processorId is a no-op
+ */
+@InterfaceStability.Evolving
+public interface ClusterMembership {
+  /**
+   * add processor to the list of processors in the quorum
+   * @return unique id of the processor registration
+   */
+  String registerProcessor();
+
+  /**
+   * @return number of processors in the list
+   */
+  int getNumberOfProcessors();
+
+  /**
+   * remove processor from the list of processors in the quorum
+   * @param processorId to be removed from the list
+   */
+  void unregisterProcessor(String processorId);
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
similarity index 54%
rename from samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
rename to samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
index c8e9033..d7a648b 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationConstants.java
@@ -19,24 +19,12 @@
 
 package org.apache.samza.coordinator;
 
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
+public final class CoordinationConstants {
+  private CoordinationConstants() {}
 
-public interface DistributedLockWithState {
-
-  /**
-   * Try to acquire the lock, but first check if the state flag is set. If it is set, return false.
-   * If the flag is not set, and lock is acquired - return true.
-   * @param timeout Duration of lock acquiring timeout.
-   * @param unit Time Unit of the timeout defined above.
-   * @return true if lock is acquired successfully, false if state is already set.
-   * @throws TimeoutException if could not acquire the lock.
-   */
-  boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
-
-  /**
-   * Release the lock and set the state
-   */
-  void unlockAndSet();
-}
\ No newline at end of file
+  public static final String RUNID_STORE_KEY = "runId";
+  public static final String APPLICATION_RUNNER_PATH_SUFFIX = "ApplicationRunnerData";
+  public static final String RUNID_LOCK_ID = "runId";
+  public static final int LOCK_TIMEOUT_MS = 300000;
+}
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 9ebd2e2..81507c9 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -24,10 +24,11 @@ import org.apache.samza.annotation.InterfaceStability;
  *
  * Coordination service provides synchronization primitives.
  * The actual implementation (for example ZK based) is left to each implementation class.
- * This service provide three primitives:
+ * This service provides the following primitives:
  *   - LeaderElection
  *   - Latch
- *   - LockWithState (does not lock if state is set)
+ *   - Lock
+ *   - ClusterMembership (to check number of processors in quorum)
  */
 @InterfaceStability.Evolving
 public interface CoordinationUtils {
@@ -37,7 +38,9 @@ public interface CoordinationUtils {
 
   Latch getLatch(int size, String latchId);
 
-  DistributedLockWithState getLockWithState(String lockId);
+  DistributedLock getLock(String lockId);
+
+  ClusterMembership getClusterMembership();
 
   /**
    * utilites cleanup
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
similarity index 78%
rename from samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
rename to samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
index 6972cd9..a605656 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/DistributedLock.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLock.java
@@ -19,21 +19,20 @@
 
 package org.apache.samza.coordinator;
 
-import java.util.concurrent.TimeUnit;
+import java.time.Duration;
 
 
 public interface DistributedLock {
 
   /**
-   * Tries to acquire the lock
+   * Try to acquire the lock
    * @param timeout Duration of lock acquiring timeout.
-   * @param unit Time Unit of the timeout defined above.
-   * @return true if lock is acquired successfully, false if it times out.
+   * @return true if lock is acquired successfully else returns false if failed to acquire within timeout
    */
-  boolean lock(long timeout, TimeUnit unit);
+  boolean lock(Duration timeout);
 
   /**
-   * Releases the lock
+   * Release the lock
    */
   void unlock();
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
new file mode 100644
index 0000000..284c0bf
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/RunIdGenerator.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import com.google.common.base.Preconditions;
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.samza.SamzaException;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generates RunId for Standalone use case
+ * If there is only one processor in the quorum (registered with ClusterMembership) then create new runid and add to store
+ * Else read runid from the store
+ *
+ * Steps to generate:
+ * 1. acquire lock
+ * 2. add self to quorum (register itself with ClusterMembership)
+ * 3. get number of processors in quorum
+ * 4. if qurorum size is 1 (only self) then create new runid and write to store
+ * 5. if quorum size if greater than 1 then read runid from store
+ * 6. unlock
+ */
+public class RunIdGenerator {
+  private static final Logger LOG = LoggerFactory.getLogger(RunIdGenerator.class);
+
+  private final CoordinationUtils coordinationUtils;
+  private final MetadataStore metadataStore;
+  private final ClusterMembership clusterMembership;
+  private String processorId = null;
+  private volatile boolean closed = false;
+
+  public RunIdGenerator(CoordinationUtils coordinationUtils, MetadataStore metadataStore) {
+    Preconditions.checkNotNull(coordinationUtils, "CoordinationUtils cannot be null");
+    Preconditions.checkNotNull(metadataStore, "MetadataStore cannot be null");
+    this.coordinationUtils = coordinationUtils;
+    this.metadataStore = metadataStore;
+    this.clusterMembership = coordinationUtils.getClusterMembership();
+    Preconditions.checkNotNull(this.clusterMembership, "Failed to create utils for run id generation");
+  }
+
+  public Optional<String> getRunId() {
+    DistributedLock runIdLock;
+    String runId = null;
+
+    runIdLock = coordinationUtils.getLock(CoordinationConstants.RUNID_LOCK_ID);
+    if (runIdLock == null) {
+      throw new SamzaException("Failed to create utils for run id generation");
+    }
+
+    try {
+      // acquire lock to write or read run.id
+      if (runIdLock.lock(Duration.ofMillis(CoordinationConstants.LOCK_TIMEOUT_MS))) {
+        LOG.info("lock acquired for run.id generation by this processor");
+        processorId = clusterMembership.registerProcessor();
+        int numberOfActiveProcessors = clusterMembership.getNumberOfProcessors();
+        if (numberOfActiveProcessors == 0) {
+          String msg = String.format("Processor failed to fetch number of processors for run.id generation");
+          throw new SamzaException(msg);
+        }
+        if (numberOfActiveProcessors == 1) {
+          runId =
+              String.valueOf(System.currentTimeMillis()) + "-" + UUID.randomUUID().toString().substring(0, 8);
+          LOG.info("Writing the run id for this run as {}", runId);
+          metadataStore.put(CoordinationConstants.RUNID_STORE_KEY, runId.getBytes("UTF-8"));
+        } else {
+          runId = new String(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY));
+          LOG.info("Read the run id for this run as {}", runId);
+        }
+        runIdLock.unlock();
+      } else {
+        throw new SamzaException("Processor timed out waiting to acquire lock for run.id generation");
+      }
+    } catch (UnsupportedEncodingException e) {
+      throw new SamzaException("Processor could not serialize/deserialize string for run.id generation", e);
+    }
+    return Optional.ofNullable(runId);
+  }
+
+  /**
+   * might be called several times and hence should be idempotent
+   */
+  public void close() {
+    if (!closed && processorId != null) {
+      closed = true;
+      clusterMembership.unregisterProcessor(processorId);
+    }
+  }
+}
diff --git a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
index 5cce6c5..48a4a3e 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/LocalJobPlanner.java
@@ -18,8 +18,9 @@
  */
 package org.apache.samza.execution;
 
+import java.io.UnsupportedEncodingException;
+import java.time.Duration;
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import org.apache.samza.SamzaException;
@@ -28,9 +29,15 @@ import org.apache.samza.application.descriptors.ApplicationDescriptorImpl;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,19 +50,35 @@ import org.slf4j.LoggerFactory;
  */
 public class LocalJobPlanner extends JobPlanner {
   private static final Logger LOG = LoggerFactory.getLogger(LocalJobPlanner.class);
-  private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
+  private static final String STREAM_CREATION_METADATA_STORE = "StreamCreationCoordinationStore";
+  private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
+  public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
+  private static final String STREAM_CREATED_STATE_KEY = "StreamCreated_%s";
 
-  private final String uid = UUID.randomUUID().toString();
+  private final String processorId;
+  private final CoordinationUtils coordinationUtils;
+  private final String runId;
 
-  public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor) {
+  public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, String processorId) {
     super(descriptor);
+    this.processorId = processorId;
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
+    this.coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, processorId, userConfig);
+    this.runId = null;
+  }
+
+  public LocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> descriptor, CoordinationUtils coordinationUtils, String processorId, String runId) {
+    super(descriptor);
+    this.coordinationUtils = coordinationUtils;
+    this.processorId = processorId;
+    this.runId = runId;
   }
 
   @Override
   public List<JobConfig> prepareJobs() {
     // for high-level DAG, generating the plan and job configs
     // 1. initialize and plan
-    ExecutionPlan plan = getExecutionPlan();
+    ExecutionPlan plan = getExecutionPlan(runId);
 
     String executionPlanJson = "";
     try {
@@ -104,36 +127,89 @@ public class LocalJobPlanner extends JobPlanner {
       LOG.info("Set of intermediate streams is empty. Nothing to create.");
       return;
     }
-    LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+    LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", processorId);
     // Move the scope of coordination utils within stream creation to address long idle connection problem.
     // Refer SAMZA-1385 for more details
-    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(userConfig);
-    String coordinationId = new ApplicationConfig(userConfig).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
-    CoordinationUtils coordinationUtils =
-        jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(coordinationId, uid, userConfig);
     if (coordinationUtils == null) {
-      LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+      LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId);
       // each application process will try creating the streams, which
       // requires stream creation to be idempotent
       streamManager.createStreams(intStreams);
       return;
     }
 
-    DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+    // If BATCH, then need to create new intermediate streams every run.
+    // planId does not change every run and hence, need to use runid
+    // as the lockId to create a new lock with state each run
+    // to create new streams each run.
+    // If run.id is null, defaults to old behavior of using planId
+    boolean isAppModeBatch = new ApplicationConfig(userConfig).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    String lockId = planId;
+    if (isAppModeBatch && runId != null) {
+      lockId = runId;
+    }
     try {
-      // check if the processor needs to go through leader election and stream creation
-      if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
-        LOG.info("lock acquired for streams creation by " + uid);
-        streamManager.createStreams(intStreams);
-        lockWithState.unlockAndSet();
-      } else {
-        LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
-      }
-    } catch (TimeoutException e) {
-      String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
-      throw new SamzaException(msg, e);
+      checkAndCreateStreams(lockId, intStreams, streamManager);
+    } catch (TimeoutException te) {
+      throw new SamzaException(String.format("Processor {} failed to get the lock for stream initialization within timeout.", processorId), te);
     } finally {
-      coordinationUtils.close();
+      if (!isAppModeBatch && coordinationUtils != null) {
+        coordinationUtils.close();
+      }
+    }
+  }
+
+  private void checkAndCreateStreams(String lockId, List<StreamSpec> intStreams, StreamManager streamManager) throws TimeoutException {
+    MetadataStore metadataStore = getMetadataStore();
+    DistributedLock distributedLock = coordinationUtils.getLock(lockId);
+    if (distributedLock == null || metadataStore == null) {
+      LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", processorId);
+      // each application process will try creating the streams, which requires stream creation to be idempotent
+      streamManager.createStreams(intStreams);
+      return;
+    }
+    //Start timer for timeout
+    long startTime = System.currentTimeMillis();
+    long lockTimeout = TimeUnit.MILLISECONDS.convert(CoordinationConstants.LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+    // If "stream created state" exists in store then skip stream creation
+    // Else acquire lock, create streams, set state in store and unlock
+    // Checking for state before acquiring lock to prevent all processors from acquiring lock
+    // In a while loop so that if two processors check state simultaneously then
+    // to make sure the processor not acquiring the lock
+    // does not die of timeout exception and comes back and checks for state and proceeds
+    while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+      if (metadataStore.get(String.format(STREAM_CREATED_STATE_KEY, lockId)) != null) {
+        LOG.info("Processor {} found streams created state data. They must've been created by another processor.", processorId);
+        break;
+      }
+      try {
+        if (distributedLock.lock(Duration.ofMillis(10000))) {
+          LOG.info("lock acquired for streams creation by Processor " + processorId);
+          streamManager.createStreams(intStreams);
+          String streamCreatedMessage = "Streams created by processor " + processorId;
+          metadataStore.put(String.format(STREAM_CREATED_STATE_KEY, lockId), streamCreatedMessage.getBytes("UTF-8"));
+          distributedLock.unlock();
+          break;
+        } else {
+          LOG.info("Processor {} failed to get the lock for stream initialization. Will try again until time out", processorId);
+        }
+      } catch (UnsupportedEncodingException e) {
+        String msg = String.format("Processor {} failed to encode string for stream initialization", processorId);
+        throw new SamzaException(msg, e);
+      }
+    }
+    if ((System.currentTimeMillis() - startTime) >= lockTimeout) {
+      throw new TimeoutException(String.format("Processor {} failed to get the lock for stream initialization within {} milliseconds.", processorId, CoordinationConstants.LOCK_TIMEOUT_MS));
+    }
+  }
+
+  private MetadataStore getMetadataStore() {
+    String metadataStoreFactoryClass = appDesc.getConfig().get(METADATA_STORE_FACTORY_CONFIG);
+    if (metadataStoreFactoryClass == null) {
+      metadataStoreFactoryClass = DEFAULT_METADATA_STORE_FACTORY;
     }
+    MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
+    return metadataStoreFactory.getMetadataStore(STREAM_CREATION_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
   }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index 7da3369..c0b85f2 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -42,14 +43,22 @@ import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.context.ExternalContext;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.RunIdGenerator;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.job.ApplicationStatus;
+import org.apache.samza.metadatastore.MetadataStore;
+import org.apache.samza.metadatastore.MetadataStoreFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.task.TaskFactory;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.Util;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,13 +68,20 @@ import org.slf4j.LoggerFactory;
 public class LocalApplicationRunner implements ApplicationRunner {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
+  private static final String PROCESSOR_ID = UUID.randomUUID().toString();
+  private final  static String RUN_ID_METADATA_STORE = "RunIdCoordinationStore";
+  private static final String METADATA_STORE_FACTORY_CONFIG = "metadata.store.factory";
+  public final static String DEFAULT_METADATA_STORE_FACTORY = ZkMetadataStoreFactory.class.getName();
 
   private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc;
-  private final LocalJobPlanner planner;
   private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
   private final CountDownLatch shutdownLatch = new CountDownLatch(1);
   private final AtomicInteger numProcessorsToStart = new AtomicInteger();
   private final AtomicReference<Throwable> failure = new AtomicReference<>();
+  private final boolean isAppModeBatch;
+  private final Optional<CoordinationUtils> coordinationUtils;
+  private Optional<String> runId = Optional.empty();
+  private Optional<RunIdGenerator> runIdGenerator = Optional.empty();
 
   private ApplicationStatus appStatus = ApplicationStatus.New;
 
@@ -77,20 +93,74 @@ public class LocalApplicationRunner implements ApplicationRunner {
    */
   public LocalApplicationRunner(SamzaApplication app, Config config) {
     this.appDesc = ApplicationDescriptorUtil.getAppDescriptor(app, config);
-    this.planner = new LocalJobPlanner(appDesc);
+    isAppModeBatch = new ApplicationConfig(config).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    coordinationUtils = getCoordinationUtils(config);
   }
 
   /**
    * Constructor only used in unit test to allow injection of {@link LocalJobPlanner}
    */
   @VisibleForTesting
-  LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, LocalJobPlanner planner) {
+  LocalApplicationRunner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc, Optional<CoordinationUtils> coordinationUtils) {
     this.appDesc = appDesc;
-    this.planner = planner;
+    isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    this.coordinationUtils = coordinationUtils;
+  }
+
+  private Optional<CoordinationUtils> getCoordinationUtils(Config config) {
+    if (!isAppModeBatch) {
+      return Optional.empty();
+    }
+    JobCoordinatorConfig jcConfig = new JobCoordinatorConfig(config);
+    CoordinationUtils coordinationUtils = jcConfig.getCoordinationUtilsFactory().getCoordinationUtils(CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX, PROCESSOR_ID, config);
+    return Optional.ofNullable(coordinationUtils);
+  }
+
+  /**
+   * @return LocalJobPlanner created
+   */
+  @VisibleForTesting
+  LocalJobPlanner getPlanner() {
+    boolean isAppModeBatch = new ApplicationConfig(appDesc.getConfig()).getAppMode() == ApplicationConfig.ApplicationMode.BATCH;
+    if (!isAppModeBatch) {
+      return new LocalJobPlanner(appDesc, PROCESSOR_ID);
+    }
+    CoordinationUtils coordinationUtils = this.coordinationUtils.orElse(null);
+    String runId = this.runId.orElse(null);
+    return new LocalJobPlanner(appDesc, coordinationUtils, PROCESSOR_ID, runId);
+  }
+
+
+  private void initializeRunId() {
+    if (!isAppModeBatch) {
+      LOG.info("Not BATCH mode and hence not generating run id");
+      return;
+    }
+
+    if (!coordinationUtils.isPresent()) {
+      LOG.warn("Coordination utils not present. Aborting run id generation. Will continue execution without a run id.");
+      return;
+    }
+
+    try {
+      MetadataStore metadataStore = getMetadataStore();
+      runIdGenerator = Optional.of(new RunIdGenerator(coordinationUtils.get(), metadataStore));
+      runId = runIdGenerator.flatMap(RunIdGenerator::getRunId);
+    } catch (Exception e) {
+      LOG.warn("Failed to generate run id. Will continue execution without a run id. Caused by {}", e);
+    }
+  }
+
+  public Optional<String> getRunId() {
+    return this.runId;
   }
 
   @Override
   public void run(ExternalContext externalContext) {
+    initializeRunId();
+
+    LocalJobPlanner planner = getPlanner();
+
     try {
       List<JobConfig> jobConfigs = planner.prepareJobs();
 
@@ -109,6 +179,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
       // start the StreamProcessors
       processors.forEach(StreamProcessor::start);
     } catch (Throwable throwable) {
+      cleanup();
       appStatus = ApplicationStatus.unsuccessfulFinish(throwable);
       shutdownLatch.countDown();
       throw new SamzaException(String.format("Failed to start application: %s",
@@ -119,6 +190,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
   @Override
   public void kill() {
     processors.forEach(StreamProcessor::stop);
+    cleanup();
   }
 
   @Override
@@ -151,6 +223,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
       throw new SamzaException(e);
     }
 
+    cleanup();
     return finished;
   }
 
@@ -200,6 +273,17 @@ public class LocalApplicationRunner implements ApplicationRunner {
     }
   }
 
+  private void cleanup() {
+    runIdGenerator.ifPresent(RunIdGenerator::close);
+    coordinationUtils.ifPresent(CoordinationUtils::close);
+  }
+
+  private MetadataStore getMetadataStore() {
+    String metadataStoreFactoryClass = appDesc.getConfig().getOrDefault(METADATA_STORE_FACTORY_CONFIG, DEFAULT_METADATA_STORE_FACTORY);
+    MetadataStoreFactory metadataStoreFactory = Util.getObj(metadataStoreFactoryClass, MetadataStoreFactory.class);
+    return metadataStoreFactory.getMetadataStore(RUN_ID_METADATA_STORE, appDesc.getConfig(), new MetricsRegistryMap());
+  }
+
   /**
    * Defines a specific implementation of {@link ProcessorLifecycleListener} for local {@link StreamProcessor}s.
    */
@@ -262,6 +346,7 @@ public class LocalApplicationRunner implements ApplicationRunner {
         userDefinedProcessorLifecycleListener.afterStop();
       }
       if (processors.isEmpty()) {
+        cleanup();
         // no processor is still running. Notify callers waiting on waitForFinish()
         shutdownLatch.countDown();
       }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java
new file mode 100644
index 0000000..0690821
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkClusterMembership.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import com.google.common.base.Preconditions;
+import org.apache.samza.coordinator.ClusterMembership;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ZkClusterMembership implements ClusterMembership {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ZkClusterMembership.class);
+  public static final String PROCESSORS_PATH = "processors";
+  private final ZkUtils zkUtils;
+  private final String processorsPath;
+  private final String participantId;
+  private final ZkKeyBuilder keyBuilder;
+
+  public ZkClusterMembership(String participantId, ZkUtils zkUtils) {
+    Preconditions.checkNotNull(participantId, "ParticipantId cannot be null");
+    Preconditions.checkNotNull(zkUtils, "ZkUtils cannot be null");
+    this.zkUtils = zkUtils;
+    this.participantId = participantId;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    processorsPath = String.format("%s/%s", keyBuilder.getRootPath(), PROCESSORS_PATH);
+    zkUtils.validatePaths(new String[] {processorsPath});
+  }
+
+  @Override
+  public String registerProcessor() {
+    String nodePath = zkUtils.getZkClient().createEphemeralSequential(processorsPath + "/", participantId);
+    LOG.info("created ephemeral node. Registered the processor in the cluster.");
+    return ZkKeyBuilder.parseIdFromPath(nodePath);
+  }
+
+  @Override
+  public int getNumberOfProcessors() {
+    return zkUtils.getZkClient().getChildren(processorsPath).size();
+  }
+
+  @Override
+  public void unregisterProcessor(String processorId) {
+    if (processorId == null) {
+      LOG.warn("Can not unregister processor with null processorId");
+      return;
+    }
+    String nodePath = processorsPath + "/" + processorId;
+    if (zkUtils.exists(nodePath)) {
+      zkUtils.getZkClient().delete(nodePath);
+      LOG.info("Ephemeral node deleted. Unregistered the processor from cluster membership.");
+    } else {
+      LOG.warn("Ephemeral node you want to delete doesnt exist. Processor with id {} is not currently registered.", processorId);
+    }
+  }
+}
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 3d4a2d1..1e3b58f 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -20,8 +20,9 @@ package org.apache.samza.zk;
 
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.config.ZkConfig;
+import org.apache.samza.coordinator.ClusterMembership;
 import org.apache.samza.coordinator.CoordinationUtils;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
 import org.slf4j.Logger;
@@ -52,7 +53,7 @@ public class ZkCoordinationUtils implements CoordinationUtils {
   }
 
   @Override
-  public DistributedLockWithState getLockWithState(String lockId) {
+  public DistributedLock getLock(String lockId) {
     return new ZkDistributedLock(processorIdStr, zkUtils, lockId);
   }
 
@@ -70,4 +71,9 @@ public class ZkCoordinationUtils implements CoordinationUtils {
   public ZkUtils getZkUtils() {
     return zkUtils;
   }
+
+  @Override
+  public ClusterMembership getClusterMembership() {
+    return new ZkClusterMembership(processorIdStr, zkUtils);
+  }
 }
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index e93f290..0cf93c9 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
 import com.google.common.base.Strings;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
@@ -34,7 +35,8 @@ import org.slf4j.LoggerFactory;
 public class ZkCoordinationUtilsFactory implements CoordinationUtilsFactory {
   private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinationUtilsFactory.class);
 
-  public CoordinationUtils getCoordinationUtils(String groupId, String participantId, Config config) {
+  public CoordinationUtils getCoordinationUtils(String coordinationId, String participantId, Config config) {
+    String groupId = new ApplicationConfig(config).getGlobalAppId() + "/" + coordinationId;
     ZkConfig zkConfig = new ZkConfig(config);
 
     ZkClient zkClient =
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
index cfb4641..24aa26e 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
@@ -18,12 +18,10 @@
  */
 package org.apache.samza.zk;
 
+import java.time.Duration;
 import java.util.List;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import org.apache.samza.SamzaException;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -31,7 +29,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Distributed lock primitive for Zookeeper.
  */
-public class ZkDistributedLock implements DistributedLockWithState {
+public class ZkDistributedLock implements DistributedLock {
 
   public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
   private static final String STATE_INITED = "sate_initialized";
@@ -39,72 +37,65 @@ public class ZkDistributedLock implements DistributedLockWithState {
   private final String lockPath;
   private final String participantId;
   private final ZkKeyBuilder keyBuilder;
-  private final Random random = new Random();
   private String nodePath = null;
-  private final String statePath;
+  private Object mutex;
 
   public ZkDistributedLock(String participantId, ZkUtils zkUtils, String lockId) {
     this.zkUtils = zkUtils;
     this.participantId = participantId;
     this.keyBuilder = zkUtils.getKeyBuilder();
-    lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(), lockId);
-    statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId);
+    lockPath = String.format("%s/lock_%s", keyBuilder.getRootPath(), lockId);
     zkUtils.validatePaths(new String[] {lockPath});
+    mutex = new Object();
+    zkUtils.getZkClient().subscribeChildChanges(lockPath, new ParticipantChangeHandler(zkUtils));
   }
 
   /**
    * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
    * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock.
    * @param timeout Duration of lock acquiring timeout.
-   * @param unit Unit of the timeout defined above.
-   * @return true if lock is acquired successfully, false if it times out.
+   * @return true if lock is acquired successfully else returns false if failed to acquire within timeout
    */
   @Override
-  public boolean lockIfNotSet(long timeout, TimeUnit unit)
-      throws TimeoutException {
+  public boolean lock(Duration timeout) {
 
     nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId);
 
     //Start timer for timeout
     long startTime = System.currentTimeMillis();
-    long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+    long lockTimeout = timeout.toMillis();
 
     while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+      synchronized (mutex) {
+        List<String> children = zkUtils.getZkClient().getChildren(lockPath);
+        int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
 
-      if (zkUtils.getZkClient().exists(statePath)) {
-        // state already set, no point locking
-        return false;
-      }
-
-      List<String> children = zkUtils.getZkClient().getChildren(lockPath);
-      int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
-
-      if (children.size() == 0 || index == -1) {
-        throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
-      }
-      // Acquires lock when the node has the lowest sequence number and returns.
-      if (index == 0) {
-        LOG.info("Acquired lock for participant id: {}", participantId);
-        return true;
-      } else {
-        try {
-          Thread.sleep(random.nextInt(1000));
-        } catch (InterruptedException e) {
-          Thread.interrupted();
+        if (children.size() == 0 || index == -1) {
+          throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
+        }
+        // Acquires lock when the node has the lowest sequence number and returns.
+        if (index == 0) {
+          LOG.info("Acquired lock for participant id: {}", participantId);
+          return true;
+        } else {
+          try {
+            mutex.wait(lockTimeout);
+          } catch (InterruptedException e) {
+            Thread.interrupted();
+          }
+          LOG.info("Trying to acquire lock again...");
         }
-        LOG.info("Trying to acquire lock again...");
       }
     }
-    throw new TimeoutException("could not acquire lock for " + timeout + " " + unit.toString());
+    LOG.info("Failed to acquire lock within {} milliseconds.", lockTimeout);
+    return false;
   }
 
   /**
    * Unlocks, by deleting the ephemeral sequential node created to acquire the lock.
    */
   @Override
-  public void unlockAndSet() {
-    // set state
-    zkUtils.getZkClient().createPersistent(statePath, true);
+  public void unlock() {
 
     if (nodePath != null) {
       zkUtils.getZkClient().delete(nodePath);
@@ -114,4 +105,29 @@ public class ZkDistributedLock implements DistributedLockWithState {
       LOG.warn("Ephemeral lock node you want to delete doesn't exist");
     }
   }
+
+  /**
+   * Listener for changes in children of LOCK
+   * children are the ephemeral nodes created to acquire the lock
+   */
+  class ParticipantChangeHandler extends ZkUtils.GenerationAwareZkChildListener {
+
+    public ParticipantChangeHandler(ZkUtils zkUtils) {
+      super(zkUtils, "ParticipantChangeHandler");
+    }
+
+    // Called when the children of the given path changed.
+    @Override
+    public void doHandleChildChange(String parentPath, List<String> currentChildren)
+        throws Exception {
+      synchronized (mutex) {
+        if (currentChildren == null) {
+          LOG.warn("handleChildChange on path " + parentPath + " was invoked with NULL list of children");
+        } else {
+          LOG.info("ParticipantChangeHandler::handleChildChange - Path: {} Current Children: {} ", parentPath, currentChildren);
+          mutex.notify();
+        }
+      }
+    }
+  }
 }
\ No newline at end of file
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
index a9c979d..5aaa261 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkMetadataStoreFactory.java
@@ -18,7 +18,9 @@
  */
 package org.apache.samza.zk;
 
+import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
+import org.apache.samza.coordinator.CoordinationConstants;
 import org.apache.samza.metadatastore.MetadataStore;
 import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistry;
@@ -31,6 +33,8 @@ public class ZkMetadataStoreFactory implements MetadataStoreFactory {
 
   @Override
   public MetadataStore getMetadataStore(String namespace, Config config, MetricsRegistry metricsRegistry) {
-    return new ZkMetadataStore(namespace, config, metricsRegistry);
+    String globalAppId = new ApplicationConfig(config).getGlobalAppId();
+    String metadataStoreBaseDir = "/" + globalAppId + "/" + CoordinationConstants.APPLICATION_RUNNER_PATH_SUFFIX + "/" + namespace;
+    return new ZkMetadataStore(metadataStoreBaseDir, config, metricsRegistry);
   }
 }
diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
new file mode 100644
index 0000000..39bc583
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/coordinator/TestRunIdGenerator.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.coordinator;
+
+import org.apache.samza.metadatastore.MetadataStore;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.*;
+
+public class TestRunIdGenerator {
+
+  private static final String FAKE_RUNID = "FAKE_RUNID";
+  private RunIdGenerator runIdGenerator;
+  private CoordinationUtils coordinationUtils;
+  private DistributedLock distributedLock;
+  private ClusterMembership membership;
+  private MetadataStore metadataStore;
+
+  @Test
+  public void testSingleProcessorWriteRunId() throws Exception {
+    // When there is a single processor registered with ClusterMembership
+    // RunIdGenerator should write a new run id to the MetadataStore
+
+    prepareRunIdGenerator(1);
+
+    runIdGenerator.getRunId();
+
+    verify(coordinationUtils, Mockito.times(1)).getClusterMembership();
+    verify(coordinationUtils, Mockito.times(1)).getLock(anyString());
+    verify(distributedLock, Mockito.times(1)).lock(anyObject());
+    verify(distributedLock, Mockito.times(1)).unlock();
+    verify(membership, Mockito.times(1)).registerProcessor();
+    verify(membership, Mockito.times(1)).getNumberOfProcessors();
+    verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+  }
+
+  @Test
+  public void testTwoProcessorsReadRunId() throws Exception {
+    // When there are two processors registered with ClusterMembership
+    // RunIdGenerator should read run id from the MetadataStore
+
+    prepareRunIdGenerator(2);
+
+    String runId = runIdGenerator.getRunId().get();
+
+    assertEquals("Runid was not read from store", runId, FAKE_RUNID);
+
+    verify(coordinationUtils, Mockito.times(1)).getClusterMembership();
+    verify(coordinationUtils, Mockito.times(1)).getLock(anyString());
+    verify(distributedLock, Mockito.times(1)).lock(anyObject());
+    verify(distributedLock, Mockito.times(1)).unlock();
+    verify(membership, Mockito.times(1)).registerProcessor();
+    verify(membership, Mockito.times(1)).getNumberOfProcessors();
+    verify(metadataStore, Mockito.times(1)).get(CoordinationConstants.RUNID_STORE_KEY);
+  }
+
+  private void prepareRunIdGenerator(int numberOfProcessors) throws Exception {
+
+    coordinationUtils = mock(CoordinationUtils.class);
+
+    distributedLock = mock(DistributedLock.class);
+    when(distributedLock.lock(anyObject())).thenReturn(true);
+    when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+    membership = mock(ClusterMembership.class);
+    when(membership.getNumberOfProcessors()).thenReturn(numberOfProcessors);
+    when(coordinationUtils.getClusterMembership()).thenReturn(membership);
+
+    metadataStore = mock(MetadataStore.class);
+    when(metadataStore.get(CoordinationConstants.RUNID_STORE_KEY)).thenReturn(FAKE_RUNID.getBytes("UTF-8"));
+
+    runIdGenerator = spy(new RunIdGenerator(coordinationUtils, metadataStore));
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
index 5a9b634..7e2ca08 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestLocalJobPlanner.java
@@ -30,8 +30,10 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.JobCoordinatorConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.DistributedLockWithState;
+import org.apache.samza.coordinator.DistributedLock;
 import org.apache.samza.system.StreamSpec;
+import org.apache.samza.zk.ZkMetadataStore;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
@@ -42,7 +44,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -58,7 +59,7 @@ import static org.mockito.Mockito.when;
  * TODO: consolidate this with unit tests for ExecutionPlanner after SAMZA-1811
  */
 @RunWith(PowerMockRunner.class)
-@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class})
+@PrepareForTest({LocalJobPlanner.class, JobCoordinatorConfig.class, ZkMetadataStoreFactory.class})
 public class TestLocalJobPlanner {
 
   private static final String PLAN_JSON =
@@ -73,7 +74,9 @@ public class TestLocalJobPlanner {
   @Test
   public void testStreamCreation()
       throws Exception {
-    localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+    StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
+    doReturn(mock(Config.class)).when(appDesc).getConfig();
+    localPlanner = createLocalJobPlanner(appDesc);
     StreamManager streamManager = mock(StreamManager.class);
     doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
 
@@ -102,7 +105,9 @@ public class TestLocalJobPlanner {
   @Test
   public void testStreamCreationWithCoordination()
       throws Exception {
-    localPlanner = createLocalJobPlanner(mock(StreamApplicationDescriptorImpl.class));
+    StreamApplicationDescriptorImpl appDesc = mock(StreamApplicationDescriptorImpl.class);
+    doReturn(mock(Config.class)).when(appDesc).getConfig();
+    localPlanner = createLocalJobPlanner(appDesc);
     StreamManager streamManager = mock(StreamManager.class);
     doReturn(streamManager).when(localPlanner).buildAndStartStreamManager(any(Config.class));
 
@@ -118,9 +123,9 @@ public class TestLocalJobPlanner {
     when(mockJcConfig.getCoordinationUtilsFactory()).thenReturn(coordinationUtilsFactory);
     PowerMockito.whenNew(JobCoordinatorConfig.class).withAnyArguments().thenReturn(mockJcConfig);
 
-    DistributedLockWithState lock = mock(DistributedLockWithState.class);
-    when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
-    when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
+    DistributedLock lock = mock(DistributedLock.class);
+    when(lock.lock(anyObject())).thenReturn(true);
+    when(coordinationUtils.getLock(anyString())).thenReturn(lock);
     when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
         .thenReturn(coordinationUtils);
 
@@ -191,8 +196,17 @@ public class TestLocalJobPlanner {
         planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
   }
 
-  private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) {
-    return spy(new LocalJobPlanner(appDesc));
+  private LocalJobPlanner createLocalJobPlanner(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) throws  Exception {
+    CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+    DistributedLock distributedLock = mock(DistributedLock.class);
+    when(distributedLock.lock(anyObject())).thenReturn(true);
+    when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+    ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class);
+    when(zkMetadataStore.get(any())).thenReturn(null);
+    PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore);
+
+    return spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
   }
 
   private String getExecutionPlanId(List<StreamSpec> updatedStreamSpecs) {
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 5e91a2a..3ccf587 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -37,42 +37,54 @@ import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.context.ExternalContext;
+import org.apache.samza.coordinator.ClusterMembership;
+import org.apache.samza.coordinator.CoordinationConstants;
+import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLock;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
 import org.apache.samza.execution.LocalJobPlanner;
 import org.apache.samza.task.IdentityStreamTask;
+import org.apache.samza.zk.ZkMetadataStore;
+import org.apache.samza.zk.ZkMetadataStoreFactory;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.*;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.*;
 
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({LocalJobPlanner.class, LocalApplicationRunner.class, ZkMetadataStoreFactory.class})
 public class TestLocalApplicationRunner {
 
   private Config config;
   private SamzaApplication mockApp;
   private LocalApplicationRunner runner;
   private LocalJobPlanner localPlanner;
+  private CoordinationUtils coordinationUtils;
+  private ZkMetadataStore metadataStore;
+  private ClusterMembership clusterMembership;
 
   @Before
-  public void setUp() {
+  public void setUp() throws Exception {
     config = new MapConfig();
     mockApp = mock(StreamApplication.class);
     prepareTest();
   }
 
   @Test
-  public void testRunStreamTask() {
+  public void testRunStreamTask() throws Exception {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     cfgs.put(ApplicationConfig.APP_NAME, "test-app");
@@ -105,7 +117,7 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunStreamTaskWithoutExternalContext() {
+  public void testRunStreamTaskWithoutExternalContext() throws Exception {
     final Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     cfgs.put(ApplicationConfig.APP_NAME, "test-app");
@@ -136,7 +148,7 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunComplete() {
+  public void testRunComplete() throws Exception {
     Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
     config = new MapConfig(cfgs);
@@ -172,7 +184,7 @@ public class TestLocalApplicationRunner {
   }
 
   @Test
-  public void testRunFailure() {
+  public void testRunFailure() throws Exception {
     Map<String, String> cfgs = new HashMap<>();
     cfgs.put(ApplicationConfig.PROCESSOR_ID, "0");
     config = new MapConfig(cfgs);
@@ -247,11 +259,97 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner.createProcessorId(mockConfig);
   }
 
-  private void prepareTest() {
+  private void prepareTest() throws Exception {
+    CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
+
+    DistributedLock distributedLock = mock(DistributedLock.class);
+    when(distributedLock.lock(anyObject())).thenReturn(true);
+    when(coordinationUtils.getLock(anyString())).thenReturn(distributedLock);
+
+    ZkMetadataStore zkMetadataStore = mock(ZkMetadataStore.class);
+    when(zkMetadataStore.get(any())).thenReturn(null);
+    PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(zkMetadataStore);
+
+    ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
+        ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
+    localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
+    runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils)));
+    doReturn(localPlanner).when(runner).getPlanner();
+  }
+
+  /**
+   * For app.mode=BATCH ensure that the run.id generation utils --
+   * DistributedLock, ClusterMembership and MetadataStore are created.
+   * Also ensure that metadataStore.put is invoked (to write the run.id)
+   * @throws Exception
+   */
+  @Test
+  public void testRunIdForBatch() throws Exception {
+    final Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_MODE, "BATCH");
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+    cfgs.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(cfgs);
+    mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
+
+    prepareTestForRunId();
+    runner.run();
+
+    verify(coordinationUtils, Mockito.times(1)).getLock(CoordinationConstants.RUNID_LOCK_ID);
+    verify(clusterMembership, Mockito.times(1)).getNumberOfProcessors();
+    verify(metadataStore, Mockito.times(1)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+  }
+
+  /**
+   * For app.mode=STREAM ensure that the run.id generation utils --
+   * DistributedLock, ClusterMembership and MetadataStore are NOT created.
+   * Also ensure that metadataStore.put is NOT invoked
+   * @throws Exception
+   */
+  @Test
+  public void testRunIdForStream() throws Exception {
+    final Map<String, String> cfgs = new HashMap<>();
+    cfgs.put(ApplicationConfig.APP_MODE, "STREAM");
+    cfgs.put(ApplicationConfig.APP_PROCESSOR_ID_GENERATOR_CLASS, UUIDGenerator.class.getName());
+    cfgs.put(JobConfig.JOB_NAME(), "test-task-job");
+    cfgs.put(JobConfig.JOB_ID(), "jobId");
+    config = new MapConfig(cfgs);
+    mockApp = new LegacyTaskApplication(IdentityStreamTask.class.getName());
+
+    prepareTestForRunId();
+
+    runner.run();
+
+
+    verify(coordinationUtils, Mockito.times(0)).getLock(CoordinationConstants.RUNID_LOCK_ID);
+    verify(coordinationUtils, Mockito.times(0)).getClusterMembership();
+    verify(clusterMembership, Mockito.times(0)).getNumberOfProcessors();
+    verify(metadataStore, Mockito.times(0)).put(eq(CoordinationConstants.RUNID_STORE_KEY), any(byte[].class));
+  }
+
+  private void prepareTestForRunId() throws Exception {
+    coordinationUtils = mock(CoordinationUtils.class);
+
+    DistributedLock lock = mock(DistributedLock.class);
+    when(lock.lock(anyObject())).thenReturn(true);
+    when(coordinationUtils.getLock(anyString())).thenReturn(lock);
+
+    clusterMembership = mock(ClusterMembership.class);
+    when(clusterMembership.getNumberOfProcessors()).thenReturn(1);
+    when(coordinationUtils.getClusterMembership()).thenReturn(clusterMembership);
+
+    metadataStore = mock(ZkMetadataStore.class);
+    when(metadataStore.get(any())).thenReturn(null);
+    PowerMockito.whenNew(ZkMetadataStore.class).withAnyArguments().thenReturn(metadataStore);
+
     ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc =
         ApplicationDescriptorUtil.getAppDescriptor(mockApp, config);
-    localPlanner = spy(new LocalJobPlanner(appDesc));
-    runner = spy(new LocalApplicationRunner(appDesc, localPlanner));
+    runner = spy(new LocalApplicationRunner(appDesc, Optional.of(coordinationUtils)));
+    localPlanner = spy(new LocalJobPlanner(appDesc, coordinationUtils, "FAKE_UID", "FAKE_RUNID"));
+    doReturn(localPlanner).when(runner).getPlanner();
+    StreamProcessor sp = mock(StreamProcessor.class);
+    doReturn(sp).when(runner).createStreamProcessor(anyObject(), anyObject(), anyObject(), anyObject());
   }
 
 }
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
new file mode 100644
index 0000000..1775db5
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkClusterMembership.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
+public class TestZkClusterMembership {
+  private static EmbeddedZookeeper zkServer = null;
+  private static String testZkConnectionString = null;
+  private ZkUtils zkUtils1;
+  private ZkUtils zkUtils2;
+
+  @BeforeClass
+  public static void test() {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+    testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort());
+  }
+
+  @Before
+  public void testSetup() {
+    ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+  }
+
+  @After
+  public void testTearDown() {
+    zkUtils1.close();
+    zkUtils2.close();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  @Test
+  public void testMembershipSingleProcessor() {
+    // happy path for single processor
+    ZkClusterMembership clusterMembership = new ZkClusterMembership("p1", zkUtils1);
+    String processorId;
+
+    assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership.getNumberOfProcessors());
+
+    processorId = clusterMembership.registerProcessor();
+
+    assertEquals("ClusterMembership does not have participants after a processor registered.", 1, clusterMembership.getNumberOfProcessors());
+
+    clusterMembership.unregisterProcessor(processorId);
+
+    assertEquals("ClusterMembership has participants after the single processor unregistered.", 0, clusterMembership.getNumberOfProcessors());
+  }
+
+  @Test
+  public void testMembershipTwoProcessors() {
+    // Two processors register. Check if second processor registering gets 2 as number of processors.
+    ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1);
+    ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1);
+
+    String processorId1;
+    String processorId2;
+
+    assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors());
+
+    processorId1 = clusterMembership1.registerProcessor();
+
+    assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors());
+
+    processorId2 = clusterMembership2.registerProcessor();
+
+    assertEquals("ClusterMembership does not have 2 participants after two processor registered.", 2, clusterMembership2.getNumberOfProcessors());
+
+    clusterMembership1.unregisterProcessor(processorId1);
+    clusterMembership2.unregisterProcessor(processorId2);
+
+    assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership1.getNumberOfProcessors());
+  }
+
+  @Test
+  public void testMembershipFirstProcessorUnregister() {
+    // First processor unregisters. Check if second processor registering gets 1 as number of processors.
+    ZkClusterMembership clusterMembership1 = new ZkClusterMembership("p1", zkUtils1);
+    ZkClusterMembership clusterMembership2 = new ZkClusterMembership("p2", zkUtils1);
+
+    String processorId1;
+    String processorId2;
+
+    assertEquals("ClusterMembership has participants before any processor registered.", 0, clusterMembership1.getNumberOfProcessors());
+
+    processorId1 = clusterMembership1.registerProcessor();
+
+    assertEquals("ClusterMembership does not have participants after one processor registered.", 1, clusterMembership1.getNumberOfProcessors());
+
+    clusterMembership1.unregisterProcessor(processorId1);
+
+    processorId2 = clusterMembership2.registerProcessor();
+
+    assertEquals("ClusterMembership does not have 1 participant1 after second processor registered.", 1, clusterMembership2.getNumberOfProcessors());
+
+    clusterMembership2.unregisterProcessor(processorId2);
+
+    assertEquals("ClusterMembership has participants after both processors unregistered.", 0, clusterMembership2.getNumberOfProcessors());
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
new file mode 100644
index 0000000..b5d85aa
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkDistributedLock.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.time.Duration;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.config.ZkConfig;
+import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
+
+import java.util.List;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static junit.framework.Assert.*;
+
+public class TestZkDistributedLock {
+  private static EmbeddedZookeeper zkServer = null;
+  private static String testZkConnectionString = null;
+  private ZkUtils zkUtils1;
+  private ZkUtils zkUtils2;
+
+  @BeforeClass
+  public static void test() {
+    zkServer = new EmbeddedZookeeper();
+    zkServer.setup();
+    testZkConnectionString = String.format("127.0.0.1:%d", zkServer.getPort());
+  }
+
+  @Before
+  public void testSetup() {
+    ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+    ZkClient zkClient2 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils2 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient2, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, new NoOpMetricsRegistry());
+  }
+
+  @After
+  public void testTearDown() {
+    zkUtils1.close();
+    zkUtils2.close();
+  }
+
+  @AfterClass
+  public static void teardown() {
+    zkServer.teardown();
+  }
+
+  private List<String> getParticipants(ZkUtils zkUtils, String lockId) {
+    String lockPath = String.format("%s/lock_%s", zkUtils1.getKeyBuilder().getRootPath(), lockId);
+    return zkUtils.getZkClient().getChildren(lockPath);
+  }
+
+  @Test
+  public void testLockSingleProcessor() {
+    String lockId = "FAKE_LOCK_ID_1";
+    ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+
+
+    assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
+
+    boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+    assertEquals("Lock does not have 1 participant after first processor tries to lock.", 1, getParticipants(zkUtils1, lockId).size());
+    assertEquals("1st processor requesting to lock did not acquire the lock.", true, lock1Status);
+    lock1.unlock();
+    assertEquals("Lock does have 1 participant after first processor tries to unlock.", 0, getParticipants(zkUtils1, lockId).size());
+  }
+
+  @Test
+  public void testLockTwoProcessors() {
+    // second processor should acquire lock after first one unlocks
+    String lockId = "FAKE_LOCK_ID_2";
+    ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+    ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
+
+    assertEquals("Lock has participants before any processor tried to lock.", 0, getParticipants(zkUtils1, lockId).size());
+
+    boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+    assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
+    lock1.unlock();
+    boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
+    assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
+    lock2.unlock();
+    assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils1, lockId).size());
+  }
+
+  @Test
+  public void testLockFirstProcessorClosing() {
+    // first processor dies before unlock then second processor should acquire
+    String lockId = "FAKE_LOCK_ID_3";
+    ZkDistributedLock lock1 = new ZkDistributedLock("p1", zkUtils1, lockId);
+    ZkDistributedLock lock2 = new ZkDistributedLock("p2", zkUtils2, lockId);
+
+
+    assertEquals("Lock has participants before any processor tried to lock!", 0, getParticipants(zkUtils1, lockId).size());
+
+    boolean lock1Status = lock1.lock(Duration.ofMillis(10000));
+    assertEquals("First processor requesting to lock did not acquire the lock.", true, lock1Status);
+    // first processor dies before unlock
+    zkUtils1.close();
+
+    boolean lock2Status = lock2.lock(Duration.ofMillis(10000));
+    assertEquals("Second processor requesting to lock did not acquire the lock.", true, lock2Status);
+    lock2.unlock();
+    assertEquals("Lock does have participants after processors unlocked.", 0, getParticipants(zkUtils2, lockId).size());
+  }
+}
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
index 3d5f3b3..4d53222 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkMetadataStore.java
@@ -57,7 +57,7 @@ public class TestZkMetadataStore {
   public void beforeTest() {
     String testZkConnectionString = String.format("%s:%s", LOCALHOST, zkServer.getPort());
     Config zkConfig = new MapConfig(ImmutableMap.of(ZkConfig.ZK_CONNECT, testZkConnectionString));
-    zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("/%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
+    zkMetadataStore = new ZkMetadataStoreFactory().getMetadataStore(String.format("%s", RandomStringUtils.randomAlphabetic(5)), zkConfig, new MetricsRegistryMap());
   }
 
   @After
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index e7c3ef8..e40bbc3 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -65,6 +65,7 @@ import org.apache.samza.metadatastore.MetadataStoreFactory;
 import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.runtime.ApplicationRunner;
 import org.apache.samza.runtime.ApplicationRunners;
+import org.apache.samza.runtime.LocalApplicationRunner;
 import org.apache.samza.system.SystemStreamPartition;
 import org.apache.samza.test.StandaloneTestUtils;
 import org.apache.samza.test.harness.IntegrationTestHarness;
@@ -189,40 +190,48 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
     }
   }
 
-  private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) {
+  private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId, boolean isBatch) {
     List<String> inputSystemStreams = inputTopics.stream()
                                                  .map(topic -> String.format("%s.%s", TestZkLocalApplicationRunner.TEST_SYSTEM, topic))
                                                  .collect(Collectors.toList());
     String coordinatorSystemName = "coordinatorSystem";
-    Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder()
-        .put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS)
-        .put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams))
-        .put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM)
-        .put(TaskConfig.IGNORED_EXCEPTIONS(), "*")
-        .put(ZkConfig.ZK_CONNECT, zkConnect())
-        .put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY)
-        .put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY)
-        .put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY)
-        .put(ApplicationConfig.APP_NAME, appName)
-        .put(ApplicationConfig.APP_ID, appId)
-        .put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner")
-        .put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY)
-        .put(JobConfig.JOB_NAME(), appName)
-        .put(JobConfig.JOB_ID(), appId)
-        .put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS)
-        .put(TaskConfig.DROP_PRODUCER_ERRORS(), "true")
-        .put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS)
-        .put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000")
-        .put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true")
-        .put("job.coordinator.system", coordinatorSystemName)
-        .put("job.coordinator.replication.factor", "1")
-        .build();
+    Map<String, String> config = new HashMap<>();
+    config.put(ZkConfig.ZK_CONSENSUS_TIMEOUT_MS, BARRIER_TIMEOUT_MS);
+    config.put(TaskConfig.INPUT_STREAMS(), Joiner.on(',').join(inputSystemStreams));
+    config.put(JobConfig.JOB_DEFAULT_SYSTEM(), TestZkLocalApplicationRunner.TEST_SYSTEM);
+    config.put(TaskConfig.IGNORED_EXCEPTIONS(), "*");
+    config.put(ZkConfig.ZK_CONNECT, zkConnect());
+    config.put(JobConfig.SSP_GROUPER_FACTORY(), TEST_SSP_GROUPER_FACTORY);
+    config.put(TaskConfig.GROUPER_FACTORY(), TEST_TASK_GROUPER_FACTORY);
+    config.put(JobCoordinatorConfig.JOB_COORDINATOR_FACTORY, TEST_JOB_COORDINATOR_FACTORY);
+    config.put(ApplicationConfig.APP_NAME, appName);
+    config.put(ApplicationConfig.APP_ID, appId);
+    config.put("app.runner.class", "org.apache.samza.runtime.LocalApplicationRunner");
+    config.put(String.format("systems.%s.samza.factory", TestZkLocalApplicationRunner.TEST_SYSTEM), TEST_SYSTEM_FACTORY);
+    config.put(JobConfig.JOB_NAME(), appName);
+    config.put(JobConfig.JOB_ID(), appId);
+    config.put(TaskConfigJava.TASK_SHUTDOWN_MS, TASK_SHUTDOWN_MS);
+    config.put(TaskConfig.DROP_PRODUCER_ERRORS(), "true");
+    config.put(JobConfig.JOB_DEBOUNCE_TIME_MS(), JOB_DEBOUNCE_TIME_MS);
+    config.put(JobConfig.MONITOR_PARTITION_CHANGE_FREQUENCY_MS(), "1000");
+    config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "true");
+    config.put("job.coordinator.system", coordinatorSystemName);
+    config.put("job.coordinator.replication.factor", "1");
+    if (isBatch) {
+      config.put(ApplicationConfig.APP_MODE, "BATCH");
+      config.put(ClusterManagerConfig.HOST_AFFINITY_ENABLED, "false");
+    }
+    Map<String, String> samzaContainerConfig = ImmutableMap.<String, String>builder().putAll(config).build();
     Map<String, String> applicationConfig = Maps.newHashMap(samzaContainerConfig);
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(coordinatorSystemName, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
     applicationConfig.putAll(StandaloneTestUtils.getKafkaSystemConfigs(TestZkLocalApplicationRunner.TEST_SYSTEM, bootstrapServers(), zkConnect(), null, StandaloneTestUtils.SerdeAlias.STRING, true));
     return applicationConfig;
   }
 
+  private Map<String, String> buildStreamApplicationConfigMap(List<String> inputTopics, String appName, String appId) {
+    return buildStreamApplicationConfigMap(inputTopics, appName, appId, false);
+  }
+
   /**
    * sspGrouper is set to GroupBySystemStreamPartitionFactory.
    * Run a stream application(appRunner1) consuming messages from input topic(effectively one container).
@@ -981,6 +990,270 @@ public class TestZkLocalApplicationRunner extends IntegrationTestHarness {
     return taskAssignments;
   }
 
+  /**
+   * Test if two processors coming up at the same time agree on a single runid
+   * 1. bring up two processors
+   * 2. wait till they start consuimg messages
+   * 3. check if first processor run.id matches that of second processor
+   */
+  @Test
+  public void testAgreeingOnSameRunIdForBatch() throws InterruptedException {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    Map<String, String> configMap =
+        buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+
+
+    // Create StreamApplication from configuration.
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+    ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+        applicationConfig2), applicationConfig2);
+
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
+
+    processedMessagesLatch1.await();
+    processedMessagesLatch2.await();
+
+    // At this stage, both the processors are running.
+    // check if their runId matches
+
+    LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+    LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+    assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+  }
+
+
+  /**
+   * Test if a new processors joining an existing qurorum get the same runid
+   * 1. bring up two processors
+   * 2. wait till they start consuming messages
+   * 3. bring up a third processor
+   * 4. wait till third processor starts consuming messsages
+   * 5. check if third processor run.id matches that of first twp
+   */
+  @Test
+  public void testNewProcessorGetsSameRunIdForBatch() throws InterruptedException {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    Map<String, String> configMap =
+        buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+    applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+    // Create StreamApplication from configuration.
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+    ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+        applicationConfig2), applicationConfig2);
+
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
+
+    processedMessagesLatch1.await();
+    processedMessagesLatch2.await();
+
+    // At this stage, both the processors are running.
+    // check if their runId matches
+
+    LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+    LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+    assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+    //Bring up a new processsor
+    CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+    ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+        applicationConfig3), applicationConfig3);
+    executeRun(appRunner3, applicationConfig3);
+    processedMessagesLatch3.await();
+
+    // At this stage, the new processor is running.
+    // check if new processor's runId matches that of the older processors
+    LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+    assertEquals("RunId of the new processor does not match that of old processor", localApplicationRunner3.getRunId(), localApplicationRunner1.getRunId());
+
+
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+  }
+
+
+  /**
+   * Test one group of processors dying and a new processor coming up generates new run.id
+   * 1. bring up two processors
+   * 2. wait till they start consuimg messages
+   * 3. kill and shutdown neatly both the processors
+   * 4. bring up a new processor
+   * 5. wait till new processor starts consuming messages
+   * 6. check if new processor has new runid different from shutdown processors
+   */
+  @Test
+  public void testAllProcesssorDieNewProcessorGetsNewRunIdForBatch() throws InterruptedException {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    Map<String, String> configMap =
+        buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+    applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+    // Create StreamApplication from configuration.
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+
+    ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+        applicationConfig1), applicationConfig1);
+    ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+        applicationConfig2), applicationConfig2);
+
+    executeRun(appRunner1, applicationConfig1);
+    executeRun(appRunner2, applicationConfig2);
+
+    processedMessagesLatch1.await();
+    processedMessagesLatch2.await();
+
+    // At this stage, both the processors are running.
+    // check if their runId matches
+
+    LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+    LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+    assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+    String oldRunId = localApplicationRunner1.getRunId().get();
+
+    // shut down both the processors
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+
+    //Bring up a new processsor
+    CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+    ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+        applicationConfig3), applicationConfig3);
+    executeRun(appRunner3, applicationConfig3);
+    processedMessagesLatch3.await();
+
+    // At this stage, the new processor is running.
+    // check if new processor's runId matches that of the older processors
+    LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+
+    assertNotEquals("RunId of the new processor same as that of old stopped processors", oldRunId, localApplicationRunner3.getRunId());
+
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+  }
+
+
+  /**
+   * Test if first processor dying changes the runid for new processors joining
+   * 1. bring up two processors
+   * 2. wait till they start consuimg messages
+   * 3. kill and shutdown first processor
+   * 4. bring up a new processor
+   * 5. wait till new processor starts consuming messages
+   * 6. check if new processor gets same run.id
+   */
+  @Test
+  public void testFirstProcessorDiesButSameRunIdForBatch() throws InterruptedException {
+    publishKafkaEvents(inputKafkaTopic, 0, NUM_KAFKA_EVENTS, PROCESSOR_IDS[0]);
+
+    Map<String, String> configMap =
+        buildStreamApplicationConfigMap(ImmutableList.of(inputKafkaTopic), testStreamAppName, testStreamAppId, true);
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[0]);
+    applicationConfig1 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[1]);
+    applicationConfig2 = new ApplicationConfig(new MapConfig(configMap));
+    configMap.put(JobConfig.PROCESSOR_ID(), PROCESSOR_IDS[2]);
+    applicationConfig3 = new ApplicationConfig(new MapConfig(configMap));
+
+    CountDownLatch processedMessagesLatch1 = new CountDownLatch(1);
+    ApplicationRunner appRunner1 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch1, null, null,
+        applicationConfig1), applicationConfig1);
+    executeRun(appRunner1, applicationConfig1);
+
+    // firt processor is up and running
+    processedMessagesLatch1.await();
+
+    LocalApplicationRunner localApplicationRunner1 = (LocalApplicationRunner) appRunner1;
+
+    // bring up second processor
+    CountDownLatch processedMessagesLatch2 = new CountDownLatch(1);
+    ApplicationRunner appRunner2 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch2, null, null,
+        applicationConfig2), applicationConfig2);
+    executeRun(appRunner2, applicationConfig2);
+
+    // second processor is up and running
+    processedMessagesLatch2.await();
+    LocalApplicationRunner localApplicationRunner2 = (LocalApplicationRunner) appRunner2;
+
+    assertEquals("RunId of the two processors does not match", localApplicationRunner2.getRunId(), localApplicationRunner1.getRunId());
+
+    // shut down first processor
+    appRunner1.kill();
+    appRunner1.waitForFinish();
+
+    //Bring up a new processsor
+    CountDownLatch processedMessagesLatch3 = new CountDownLatch(1);
+    ApplicationRunner appRunner3 = ApplicationRunners.getApplicationRunner(TestStreamApplication.getInstance(
+        TEST_SYSTEM, ImmutableList.of(inputKafkaTopic), outputKafkaTopic, processedMessagesLatch3, null, null,
+        applicationConfig3), applicationConfig3);
+    executeRun(appRunner3, applicationConfig3);
+    processedMessagesLatch3.await();
+
+    // At this stage, the new processor is running.
+    // check if new processor runid matches the old ones
+    LocalApplicationRunner localApplicationRunner3 = (LocalApplicationRunner) appRunner3;
+    assertEquals("RunId of the new processor is not the same as that of earlier processors", localApplicationRunner2.getRunId(), localApplicationRunner3.getRunId());
+
+
+    appRunner2.kill();
+    appRunner2.waitForFinish();
+    appRunner3.kill();
+    appRunner3.waitForFinish();
+  }
+
+
   private static Set<SystemStreamPartition> getSystemStreamPartitions(JobModel jobModel) {
     System.out.println(jobModel);
     Set<SystemStreamPartition> ssps = new HashSet<>();