You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/09/12 21:20:04 UTC

[23/26] samza git commit: SAMZA-1385: Coordination utils factory with distributed lock

SAMZA-1385: Coordination utils factory with distributed lock

this PR includes some changes from another PR. I will re-merge it again, after the other PR is in.

Author: Boris Shkolnik <bo...@apache.org>

Reviewers: Xinyu Liu <xi...@apache.org>

Closes #284 from sborya/CoordinationUtilsFactory_withDistributedLock


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dd07e074
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dd07e074
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dd07e074

Branch: refs/heads/0.14.0
Commit: dd07e07421bd57e0f927b8974131b949f5ac2e71
Parents: 81b4e63
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Aug 29 15:37:17 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Aug 29 15:37:17 2017 -0700

----------------------------------------------------------------------
 .../coordinator/AzureCoordinationUtils.java     |  13 ++-
 .../org/apache/samza/coordinator/AzureLock.java |   6 +-
 .../samza/coordinator/CoordinationUtils.java    |   5 +-
 .../coordinator/DistributedLockWithState.java   |  42 +++++++
 .../samza/runtime/LocalApplicationRunner.java   |  94 ++++++---------
 .../apache/samza/zk/ZkCoordinationUtils.java    |   5 +
 .../samza/zk/ZkCoordinationUtilsFactory.java    |   2 +-
 .../org/apache/samza/zk/ZkDistributedLock.java  | 117 +++++++++++++++++++
 .../runtime/TestApplicationRunnerMain.java      |   2 +-
 .../runtime/TestLocalApplicationRunner.java     |  57 ++-------
 10 files changed, 220 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index dbd945f..2a42514 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -46,13 +46,14 @@ public class AzureCoordinationUtils implements CoordinationUtils {
   }
 
   @Override
-  public void close() {
-
-  }
-
-  public DistributedLock getLock(String initLockName) {
+  public DistributedLockWithState getLockWithState(String lockId) {
     BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
-        azureConfig.getAzureBlobName() + initLockName, azureConfig.getAzureBlobLength());
+        azureConfig.getAzureBlobName() + lockId, azureConfig.getAzureBlobLength());
     return new AzureLock(blob);
   }
+
+  @Override
+  public void close() {
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index 0ef1b83..c0d3ff2 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Distributed lock primitive for Azure.
  */
-public class AzureLock implements DistributedLock {
+public class AzureLock implements DistributedLockWithState {
 
   private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
   private static final int LEASE_TIME_IN_SEC = 60;
@@ -55,7 +55,7 @@ public class AzureLock implements DistributedLock {
    * @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
    */
   @Override
-  public boolean lock(long timeout, TimeUnit unit) {
+  public boolean lockIfNotSet(long timeout, TimeUnit unit) {
     //Start timer for timeout
     long startTime = System.currentTimeMillis();
     long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
@@ -87,7 +87,7 @@ public class AzureLock implements DistributedLock {
    * Unlocks, by releasing the lease on the blob.
    */
   @Override
-  public void unlock() {
+  public void unlockAndSet() {
     boolean status = leaseBlobManager.releaseLease(leaseId.get());
     if (status) {
       LOG.info("Unlocked successfully.");

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 4ba44b5..9ebd2e2 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -27,6 +27,7 @@ import org.apache.samza.annotation.InterfaceStability;
  * This service provide three primitives:
  *   - LeaderElection
  *   - Latch
+ *   - LockWithState (does not lock if state is set)
  */
 @InterfaceStability.Evolving
 public interface CoordinationUtils {
@@ -36,8 +37,10 @@ public interface CoordinationUtils {
 
   Latch getLatch(int size, String latchId);
 
+  DistributedLockWithState getLockWithState(String lockId);
+
   /**
-   * performs necessary cleanup and closes ALL the utils.
+   * utilites cleanup
    */
   void close();
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
new file mode 100644
index 0000000..0de7813
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public interface DistributedLockWithState {
+
+  /**
+   * Trie to acquire the lock, but first check if the state flag is set. If it is set, return false.
+   * If the flag is not set, and lock is acquired - return true.
+   * Throw TimeOutException if could not acquire the lock.
+   * @param timeout Duration of lock acquiring timeout.
+   * @param unit Time Unit of the timeout defined above.
+   * @return true if lock is acquired successfully, false if state is already set.
+   */
+  boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
+
+  /**
+   * Release the lock and set the state
+   */
+  void unlockAndSet();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index fc11cf5..077c124 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -37,8 +37,7 @@ import org.apache.samza.config.JobConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.Latch;
-import org.apache.samza.coordinator.LeaderElector;
+import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.processor.StreamProcessor;
@@ -56,10 +55,7 @@ import org.slf4j.LoggerFactory;
 public class LocalApplicationRunner extends AbstractApplicationRunner {
 
   private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
-  private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData";
-  // Latch timeout is set to 10 min
-  private static final long LATCH_TIMEOUT_MINUTES = 10;
-  private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;
+  private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
 
   private final String uid;
   private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
@@ -214,36 +210,39 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
    * @throws TimeoutException exception for latch timeout
    */
   /* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
-    if (!intStreams.isEmpty()) {
-      // Move the scope of coordination utils within stream creation to address long idle connection problem.
-      // Refer SAMZA-1385 for more details
-
-      String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX;
-      CoordinationUtils coordinationUtils =
-          CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
-      if (coordinationUtils != null) {
-        Latch initLatch = coordinationUtils.getLatch(1, planId);
-
-        try {
-          // check if the processor needs to go through leader election and stream creation
-          if (shouldContestInElectionForStreamCreation(initLatch)) {
-            LeaderElector leaderElector = coordinationUtils.getLeaderElector();
-            leaderElector.setLeaderElectorListener(() -> {
-                getStreamManager().createStreams(intStreams);
-                initLatch.countDown();
-              });
-            leaderElector.tryBecomeLeader();
-            initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
-          }
-        } finally {
-          if (initLatch != null)
-            coordinationUtils.close();
-        }
-      } else {
-        // each application process will try creating the streams, which
-        // requires stream creation to be idempotent
+    if (intStreams.isEmpty()) {
+      LOG.info("Set of intermediate streams is empty. Nothing to create.");
+      return;
+    }
+    LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+    // Move the scope of coordination utils within stream creation to address long idle connection problem.
+    // Refer SAMZA-1385 for more details
+    String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+    CoordinationUtils coordinationUtils =
+        CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
+    if (coordinationUtils == null) {
+      LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+      // each application process will try creating the streams, which
+      // requires stream creation to be idempotent
+      getStreamManager().createStreams(intStreams);
+      return;
+    }
+
+    DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+    try {
+      // check if the processor needs to go through leader election and stream creation
+      if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
+        LOG.info("lock acquired for streams creation by " + uid);
         getStreamManager().createStreams(intStreams);
+        lockWithState.unlockAndSet();
+      } else {
+        LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
       }
+    } catch (TimeoutException e) {
+      String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
+      throw new SamzaException(msg, e);
+    } finally {
+      coordinationUtils.close();
     }
   }
 
@@ -270,31 +269,4 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
           taskFactory.getClass().getCanonicalName()));
     }
   }
-
-  /**
-   * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying
-   * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election.
-   * It creates the following issues whenever a new process joins after the ephemeral node is gone.
-   *    1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle
-   *    2. Underlying systems may not support check for stream existence prior to creation which could have potential problems.
-   * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the
-   * application lifecycle using {@link Latch#await(long, TimeUnit)}
-   *
-   * @param streamCreationLatch latch used for stream creation
-   * @return true if processor needs to be part of election
-   *         false otherwise
-   */
-  private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) {
-    boolean eligibleForElection = true;
-
-    try {
-      streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
-      // case we didn't time out suggesting that latch already exists
-      eligibleForElection = false;
-    } catch (TimeoutException e) {
-      LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams");
-    }
-
-    return eligibleForElection;
-  }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 05886db..3d4a2d1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
 import org.I0Itec.zkclient.exception.ZkInterruptedException;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.coordinator.Latch;
 import org.apache.samza.coordinator.LeaderElector;
 import org.slf4j.Logger;
@@ -51,6 +52,10 @@ public class ZkCoordinationUtils implements CoordinationUtils {
   }
 
   @Override
+  public DistributedLockWithState getLockWithState(String lockId) {
+    return new ZkDistributedLock(processorIdStr, zkUtils, lockId);
+  }
+
   public void close() {
     try {
       if (zkUtils != null)

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index ded6a38..8dd42c1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -23,8 +23,8 @@ import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.SamzaException;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.zookeeper.client.ConnectStringParser;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
new file mode 100644
index 0000000..cfb4641
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedLock implements DistributedLockWithState {
+
+  public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
+  private static final String STATE_INITED = "sate_initialized";
+  private final ZkUtils zkUtils;
+  private final String lockPath;
+  private final String participantId;
+  private final ZkKeyBuilder keyBuilder;
+  private final Random random = new Random();
+  private String nodePath = null;
+  private final String statePath;
+
+  public ZkDistributedLock(String participantId, ZkUtils zkUtils, String lockId) {
+    this.zkUtils = zkUtils;
+    this.participantId = participantId;
+    this.keyBuilder = zkUtils.getKeyBuilder();
+    lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(), lockId);
+    statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId);
+    zkUtils.validatePaths(new String[] {lockPath});
+  }
+
+  /**
+   * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
+   * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock.
+   * @param timeout Duration of lock acquiring timeout.
+   * @param unit Unit of the timeout defined above.
+   * @return true if lock is acquired successfully, false if it times out.
+   */
+  @Override
+  public boolean lockIfNotSet(long timeout, TimeUnit unit)
+      throws TimeoutException {
+
+    nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId);
+
+    //Start timer for timeout
+    long startTime = System.currentTimeMillis();
+    long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+
+    while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+
+      if (zkUtils.getZkClient().exists(statePath)) {
+        // state already set, no point locking
+        return false;
+      }
+
+      List<String> children = zkUtils.getZkClient().getChildren(lockPath);
+      int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
+
+      if (children.size() == 0 || index == -1) {
+        throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
+      }
+      // Acquires lock when the node has the lowest sequence number and returns.
+      if (index == 0) {
+        LOG.info("Acquired lock for participant id: {}", participantId);
+        return true;
+      } else {
+        try {
+          Thread.sleep(random.nextInt(1000));
+        } catch (InterruptedException e) {
+          Thread.interrupted();
+        }
+        LOG.info("Trying to acquire lock again...");
+      }
+    }
+    throw new TimeoutException("could not acquire lock for " + timeout + " " + unit.toString());
+  }
+
+  /**
+   * Unlocks, by deleting the ephemeral sequential node created to acquire the lock.
+   */
+  @Override
+  public void unlockAndSet() {
+    // set state
+    zkUtils.getZkClient().createPersistent(statePath, true);
+
+    if (nodePath != null) {
+      zkUtils.getZkClient().delete(nodePath);
+      nodePath = null;
+      LOG.info("Ephemeral lock node deleted. Unlocked!");
+    } else {
+      LOG.warn("Ephemeral lock node you want to delete doesn't exist");
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index d22fbae..eb0ebe9 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -25,7 +25,7 @@ import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.operators.StreamGraph;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
 
 
 public class TestApplicationRunnerMain {

http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 38f58fd..f9c1252 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -25,8 +25,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.samza.application.StreamApplication;
 import org.apache.samza.config.ApplicationConfig;
@@ -35,9 +33,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.config.TaskConfig;
 import org.apache.samza.coordinator.CoordinationUtils;
 import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.Latch;
-import org.apache.samza.coordinator.LeaderElector;
-import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.DistributedLockWithState;
 import org.apache.samza.execution.ExecutionPlan;
 import org.apache.samza.execution.ExecutionPlanner;
 import org.apache.samza.execution.StreamManager;
@@ -54,7 +50,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.anyInt;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.*;
@@ -168,53 +163,13 @@ public class TestLocalApplicationRunner {
     LocalApplicationRunner spy = spy(runner);
 
     CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
-    LeaderElector leaderElector = new LeaderElector() {
-      private LeaderElectorListener leaderElectorListener;
-
-      @Override
-      public void setLeaderElectorListener(LeaderElectorListener listener) {
-        this.leaderElectorListener = listener;
-      }
-
-      @Override
-      public void tryBecomeLeader() {
-        leaderElectorListener.onBecomingLeader();
-      }
-
-      @Override
-      public void resignLeadership() {
-      }
-
-      @Override
-      public boolean amILeader() {
-        return false;
-      }
-    };
-
-    Latch latch = new Latch() {
-      boolean done = false;
-
-      @Override
-      public void await(long timeout, TimeUnit tu)
-          throws TimeoutException {
-        // in this test, latch is released after countDown is invoked
-        if (!done) {
-          throw new TimeoutException("timed out waiting for the target path");
-        }
-      }
-
-      @Override
-      public void countDown() {
-        done = true;
-      }
-    };
-
-    mockStatic(CoordinationUtilsFactory.class);
     CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+    mockStatic(CoordinationUtilsFactory.class);
     when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
 
-    when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector);
-    when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch);
+    DistributedLockWithState lock = mock(DistributedLockWithState.class);
+    when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
+    when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
     when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
         .thenReturn(coordinationUtils);
 
@@ -384,6 +339,7 @@ public class TestLocalApplicationRunner {
         new StreamSpec("test-stream-1", "stream-1", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
 
+
     assertFalse("Expected both of the latch ids to be different",
         planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
   }
@@ -419,6 +375,7 @@ public class TestLocalApplicationRunner {
         new StreamSpec("test-stream-4", "stream-4", "testStream"),
         new StreamSpec("test-stream-3", "stream-3", "testStream"));
 
+
     assertFalse("Expected both of the latch ids to be different",
         planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
   }