You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by xi...@apache.org on 2017/08/29 22:37:27 UTC
samza git commit: SAMZA-1385: Coordination utils factory with
distributed lock
Repository: samza
Updated Branches:
refs/heads/master 81b4e6361 -> dd07e0742
SAMZA-1385: Coordination utils factory with distributed lock
this PR includes some changes from another PR. I will re-merge it again, after the other PR is in.
Author: Boris Shkolnik <bo...@apache.org>
Reviewers: Xinyu Liu <xi...@apache.org>
Closes #284 from sborya/CoordinationUtilsFactory_withDistributedLock
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/dd07e074
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/dd07e074
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/dd07e074
Branch: refs/heads/master
Commit: dd07e07421bd57e0f927b8974131b949f5ac2e71
Parents: 81b4e63
Author: Boris Shkolnik <bo...@apache.org>
Authored: Tue Aug 29 15:37:17 2017 -0700
Committer: Xinyu Liu <xi...@xiliu-ld1.linkedin.biz>
Committed: Tue Aug 29 15:37:17 2017 -0700
----------------------------------------------------------------------
.../coordinator/AzureCoordinationUtils.java | 13 ++-
.../org/apache/samza/coordinator/AzureLock.java | 6 +-
.../samza/coordinator/CoordinationUtils.java | 5 +-
.../coordinator/DistributedLockWithState.java | 42 +++++++
.../samza/runtime/LocalApplicationRunner.java | 94 ++++++---------
.../apache/samza/zk/ZkCoordinationUtils.java | 5 +
.../samza/zk/ZkCoordinationUtilsFactory.java | 2 +-
.../org/apache/samza/zk/ZkDistributedLock.java | 117 +++++++++++++++++++
.../runtime/TestApplicationRunnerMain.java | 2 +-
.../runtime/TestLocalApplicationRunner.java | 57 ++-------
10 files changed, 220 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
index dbd945f..2a42514 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureCoordinationUtils.java
@@ -46,13 +46,14 @@ public class AzureCoordinationUtils implements CoordinationUtils {
}
@Override
- public void close() {
-
- }
-
- public DistributedLock getLock(String initLockName) {
+ public DistributedLockWithState getLockWithState(String lockId) {
BlobUtils blob = new BlobUtils(client, azureConfig.getAzureContainerName(),
- azureConfig.getAzureBlobName() + initLockName, azureConfig.getAzureBlobLength());
+ azureConfig.getAzureBlobName() + lockId, azureConfig.getAzureBlobLength());
return new AzureLock(blob);
}
+
+ @Override
+ public void close() {
+
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
----------------------------------------------------------------------
diff --git a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
index 0ef1b83..c0d3ff2 100644
--- a/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
+++ b/samza-azure/src/main/java/org/apache/samza/coordinator/AzureLock.java
@@ -33,7 +33,7 @@ import org.slf4j.LoggerFactory;
/**
* Distributed lock primitive for Azure.
*/
-public class AzureLock implements DistributedLock {
+public class AzureLock implements DistributedLockWithState {
private static final Logger LOG = LoggerFactory.getLogger(AzureLock.class);
private static final int LEASE_TIME_IN_SEC = 60;
@@ -55,7 +55,7 @@ public class AzureLock implements DistributedLock {
* @return true if the lock was acquired successfully, false if lock acquire operation is unsuccessful even after subsequent tries within the timeout range.
*/
@Override
- public boolean lock(long timeout, TimeUnit unit) {
+ public boolean lockIfNotSet(long timeout, TimeUnit unit) {
//Start timer for timeout
long startTime = System.currentTimeMillis();
long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
@@ -87,7 +87,7 @@ public class AzureLock implements DistributedLock {
* Unlocks, by releasing the lease on the blob.
*/
@Override
- public void unlock() {
+ public void unlockAndSet() {
boolean status = leaseBlobManager.releaseLease(leaseId.get());
if (status) {
LOG.info("Unlocked successfully.");
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
index 4ba44b5..9ebd2e2 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/CoordinationUtils.java
@@ -27,6 +27,7 @@ import org.apache.samza.annotation.InterfaceStability;
* This service provide three primitives:
* - LeaderElection
* - Latch
+ * - LockWithState (does not lock if state is set)
*/
@InterfaceStability.Evolving
public interface CoordinationUtils {
@@ -36,8 +37,10 @@ public interface CoordinationUtils {
Latch getLatch(int size, String latchId);
+ DistributedLockWithState getLockWithState(String lockId);
+
/**
- * performs necessary cleanup and closes ALL the utils.
+ * utilites cleanup
*/
void close();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
new file mode 100644
index 0000000..0de7813
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/DistributedLockWithState.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.coordinator;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+
+public interface DistributedLockWithState {
+
+ /**
+ * Trie to acquire the lock, but first check if the state flag is set. If it is set, return false.
+ * If the flag is not set, and lock is acquired - return true.
+ * Throw TimeOutException if could not acquire the lock.
+ * @param timeout Duration of lock acquiring timeout.
+ * @param unit Time Unit of the timeout defined above.
+ * @return true if lock is acquired successfully, false if state is already set.
+ */
+ boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;
+
+ /**
+ * Release the lock and set the state
+ */
+ void unlockAndSet();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
index fc11cf5..077c124 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalApplicationRunner.java
@@ -37,8 +37,7 @@ import org.apache.samza.config.JobConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.Latch;
-import org.apache.samza.coordinator.LeaderElector;
+import org.apache.samza.coordinator.DistributedLockWithState;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.processor.StreamProcessor;
@@ -56,10 +55,7 @@ import org.slf4j.LoggerFactory;
public class LocalApplicationRunner extends AbstractApplicationRunner {
private static final Logger LOG = LoggerFactory.getLogger(LocalApplicationRunner.class);
- private static final String APPLICATION_RUNNER_ZK_PATH_SUFFIX = "/ApplicationRunnerData";
- // Latch timeout is set to 10 min
- private static final long LATCH_TIMEOUT_MINUTES = 10;
- private static final long LEADER_ELECTION_WAIT_TIME_MS = 1000;
+ private static final String APPLICATION_RUNNER_PATH_SUFFIX = "/ApplicationRunnerData";
private final String uid;
private final Set<StreamProcessor> processors = ConcurrentHashMap.newKeySet();
@@ -214,36 +210,39 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
* @throws TimeoutException exception for latch timeout
*/
/* package private */ void createStreams(String planId, List<StreamSpec> intStreams) throws TimeoutException {
- if (!intStreams.isEmpty()) {
- // Move the scope of coordination utils within stream creation to address long idle connection problem.
- // Refer SAMZA-1385 for more details
-
- String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_ZK_PATH_SUFFIX;
- CoordinationUtils coordinationUtils =
- CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
- if (coordinationUtils != null) {
- Latch initLatch = coordinationUtils.getLatch(1, planId);
-
- try {
- // check if the processor needs to go through leader election and stream creation
- if (shouldContestInElectionForStreamCreation(initLatch)) {
- LeaderElector leaderElector = coordinationUtils.getLeaderElector();
- leaderElector.setLeaderElectorListener(() -> {
- getStreamManager().createStreams(intStreams);
- initLatch.countDown();
- });
- leaderElector.tryBecomeLeader();
- initLatch.await(LATCH_TIMEOUT_MINUTES, TimeUnit.MINUTES);
- }
- } finally {
- if (initLatch != null)
- coordinationUtils.close();
- }
- } else {
- // each application process will try creating the streams, which
- // requires stream creation to be idempotent
+ if (intStreams.isEmpty()) {
+ LOG.info("Set of intermediate streams is empty. Nothing to create.");
+ return;
+ }
+ LOG.info("A single processor must create the intermediate streams. Processor {} will attempt to acquire the lock.", uid);
+ // Move the scope of coordination utils within stream creation to address long idle connection problem.
+ // Refer SAMZA-1385 for more details
+ String coordinationId = new ApplicationConfig(config).getGlobalAppId() + APPLICATION_RUNNER_PATH_SUFFIX;
+ CoordinationUtils coordinationUtils =
+ CoordinationUtilsFactory.getCoordinationUtilsFactory(config).getCoordinationUtils(coordinationId, uid, config);
+ if (coordinationUtils == null) {
+ LOG.warn("Processor {} failed to create utils. Each processor will attempt to create streams.", uid);
+ // each application process will try creating the streams, which
+ // requires stream creation to be idempotent
+ getStreamManager().createStreams(intStreams);
+ return;
+ }
+
+ DistributedLockWithState lockWithState = coordinationUtils.getLockWithState(planId);
+ try {
+ // check if the processor needs to go through leader election and stream creation
+ if (lockWithState.lockIfNotSet(1000, TimeUnit.MILLISECONDS)) {
+ LOG.info("lock acquired for streams creation by " + uid);
getStreamManager().createStreams(intStreams);
+ lockWithState.unlockAndSet();
+ } else {
+ LOG.info("Processor {} did not obtain the lock for streams creation. They must've been created by another processor.", uid);
}
+ } catch (TimeoutException e) {
+ String msg = String.format("Processor {} failed to get the lock for stream initialization", uid);
+ throw new SamzaException(msg, e);
+ } finally {
+ coordinationUtils.close();
}
}
@@ -270,31 +269,4 @@ public class LocalApplicationRunner extends AbstractApplicationRunner {
taskFactory.getClass().getCanonicalName()));
}
}
-
- /**
- * In order to fix SAMZA-1385, we are limiting the scope of coordination util within stream creation phase and destroying
- * the coordination util right after. By closing the zk connection, we clean up the ephemeral node used for leader election.
- * It creates the following issues whenever a new process joins after the ephemeral node is gone.
- * 1. It is unnecessary to re-conduct leader election for stream creation in the same application lifecycle
- * 2. Underlying systems may not support check for stream existence prior to creation which could have potential problems.
- * As in interim solution, we reuse the same latch as a marker to determine if create streams phase is done for the
- * application lifecycle using {@link Latch#await(long, TimeUnit)}
- *
- * @param streamCreationLatch latch used for stream creation
- * @return true if processor needs to be part of election
- * false otherwise
- */
- private boolean shouldContestInElectionForStreamCreation(Latch streamCreationLatch) {
- boolean eligibleForElection = true;
-
- try {
- streamCreationLatch.await(LEADER_ELECTION_WAIT_TIME_MS, TimeUnit.MILLISECONDS);
- // case we didn't time out suggesting that latch already exists
- eligibleForElection = false;
- } catch (TimeoutException e) {
- LOG.info("Timed out waiting for the latch! Going to enter leader election section to create streams");
- }
-
- return eligibleForElection;
- }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
index 05886db..3d4a2d1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtils.java
@@ -21,6 +21,7 @@ package org.apache.samza.zk;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.DistributedLockWithState;
import org.apache.samza.coordinator.Latch;
import org.apache.samza.coordinator.LeaderElector;
import org.slf4j.Logger;
@@ -51,6 +52,10 @@ public class ZkCoordinationUtils implements CoordinationUtils {
}
@Override
+ public DistributedLockWithState getLockWithState(String lockId) {
+ return new ZkDistributedLock(processorIdStr, zkUtils, lockId);
+ }
+
public void close() {
try {
if (zkUtils != null)
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
index ded6a38..8dd42c1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkCoordinationUtilsFactory.java
@@ -23,8 +23,8 @@ import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.ZkConfig;
-import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.coordinator.CoordinationUtils;
+import org.apache.samza.coordinator.CoordinationUtilsFactory;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.zookeeper.client.ConnectStringParser;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
new file mode 100644
index 0000000..cfb4641
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkDistributedLock.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.samza.SamzaException;
+import org.apache.samza.coordinator.DistributedLockWithState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Distributed lock primitive for Zookeeper.
+ */
+public class ZkDistributedLock implements DistributedLockWithState {
+
+ public static final Logger LOG = LoggerFactory.getLogger(ZkDistributedLock.class);
+ private static final String STATE_INITED = "sate_initialized";
+ private final ZkUtils zkUtils;
+ private final String lockPath;
+ private final String participantId;
+ private final ZkKeyBuilder keyBuilder;
+ private final Random random = new Random();
+ private String nodePath = null;
+ private final String statePath;
+
+ public ZkDistributedLock(String participantId, ZkUtils zkUtils, String lockId) {
+ this.zkUtils = zkUtils;
+ this.participantId = participantId;
+ this.keyBuilder = zkUtils.getKeyBuilder();
+ lockPath = String.format("%s/stateLock_%s", keyBuilder.getRootPath(), lockId);
+ statePath = String.format("%s/%s_%s", lockPath, STATE_INITED, lockId);
+ zkUtils.validatePaths(new String[] {lockPath});
+ }
+
+ /**
+ * Tries to acquire a lock in order to create intermediate streams. On failure to acquire lock, it keeps trying until the lock times out.
+ * Creates a sequential ephemeral node to acquire the lock. If the path of this node has the lowest sequence number, the processor has acquired the lock.
+ * @param timeout Duration of lock acquiring timeout.
+ * @param unit Unit of the timeout defined above.
+ * @return true if lock is acquired successfully, false if it times out.
+ */
+ @Override
+ public boolean lockIfNotSet(long timeout, TimeUnit unit)
+ throws TimeoutException {
+
+ nodePath = zkUtils.getZkClient().createEphemeralSequential(lockPath + "/", participantId);
+
+ //Start timer for timeout
+ long startTime = System.currentTimeMillis();
+ long lockTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
+
+ while ((System.currentTimeMillis() - startTime) < lockTimeout) {
+
+ if (zkUtils.getZkClient().exists(statePath)) {
+ // state already set, no point locking
+ return false;
+ }
+
+ List<String> children = zkUtils.getZkClient().getChildren(lockPath);
+ int index = children.indexOf(ZkKeyBuilder.parseIdFromPath(nodePath));
+
+ if (children.size() == 0 || index == -1) {
+ throw new SamzaException("Looks like we are no longer connected to Zk. Need to reconnect!");
+ }
+ // Acquires lock when the node has the lowest sequence number and returns.
+ if (index == 0) {
+ LOG.info("Acquired lock for participant id: {}", participantId);
+ return true;
+ } else {
+ try {
+ Thread.sleep(random.nextInt(1000));
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ }
+ LOG.info("Trying to acquire lock again...");
+ }
+ }
+ throw new TimeoutException("could not acquire lock for " + timeout + " " + unit.toString());
+ }
+
+ /**
+ * Unlocks, by deleting the ephemeral sequential node created to acquire the lock.
+ */
+ @Override
+ public void unlockAndSet() {
+ // set state
+ zkUtils.getZkClient().createPersistent(statePath, true);
+
+ if (nodePath != null) {
+ zkUtils.getZkClient().delete(nodePath);
+ nodePath = null;
+ LOG.info("Ephemeral lock node deleted. Unlocked!");
+ } else {
+ LOG.warn("Ephemeral lock node you want to delete doesn't exist");
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
index d22fbae..eb0ebe9 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestApplicationRunnerMain.java
@@ -25,7 +25,7 @@ import org.apache.samza.job.ApplicationStatus;
import org.apache.samza.operators.StreamGraph;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
public class TestApplicationRunnerMain {
http://git-wip-us.apache.org/repos/asf/samza/blob/dd07e074/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
index 38f58fd..f9c1252 100644
--- a/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
+++ b/samza-core/src/test/java/org/apache/samza/runtime/TestLocalApplicationRunner.java
@@ -25,8 +25,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.ApplicationConfig;
@@ -35,9 +33,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.coordinator.CoordinationUtils;
import org.apache.samza.coordinator.CoordinationUtilsFactory;
-import org.apache.samza.coordinator.Latch;
-import org.apache.samza.coordinator.LeaderElector;
-import org.apache.samza.coordinator.LeaderElectorListener;
+import org.apache.samza.coordinator.DistributedLockWithState;
import org.apache.samza.execution.ExecutionPlan;
import org.apache.samza.execution.ExecutionPlanner;
import org.apache.samza.execution.StreamManager;
@@ -54,7 +50,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.*;
@@ -168,53 +163,13 @@ public class TestLocalApplicationRunner {
LocalApplicationRunner spy = spy(runner);
CoordinationUtils coordinationUtils = mock(CoordinationUtils.class);
- LeaderElector leaderElector = new LeaderElector() {
- private LeaderElectorListener leaderElectorListener;
-
- @Override
- public void setLeaderElectorListener(LeaderElectorListener listener) {
- this.leaderElectorListener = listener;
- }
-
- @Override
- public void tryBecomeLeader() {
- leaderElectorListener.onBecomingLeader();
- }
-
- @Override
- public void resignLeadership() {
- }
-
- @Override
- public boolean amILeader() {
- return false;
- }
- };
-
- Latch latch = new Latch() {
- boolean done = false;
-
- @Override
- public void await(long timeout, TimeUnit tu)
- throws TimeoutException {
- // in this test, latch is released after countDown is invoked
- if (!done) {
- throw new TimeoutException("timed out waiting for the target path");
- }
- }
-
- @Override
- public void countDown() {
- done = true;
- }
- };
-
- mockStatic(CoordinationUtilsFactory.class);
CoordinationUtilsFactory coordinationUtilsFactory = mock(CoordinationUtilsFactory.class);
+ mockStatic(CoordinationUtilsFactory.class);
when(CoordinationUtilsFactory.getCoordinationUtilsFactory(anyObject())).thenReturn(coordinationUtilsFactory);
- when(coordinationUtils.getLeaderElector()).thenReturn(leaderElector);
- when(coordinationUtils.getLatch(anyInt(), anyString())).thenReturn(latch);
+ DistributedLockWithState lock = mock(DistributedLockWithState.class);
+ when(lock.lockIfNotSet(anyLong(), anyObject())).thenReturn(true);
+ when(coordinationUtils.getLockWithState(anyString())).thenReturn(lock);
when(coordinationUtilsFactory.getCoordinationUtils(anyString(), anyString(), anyObject()))
.thenReturn(coordinationUtils);
@@ -384,6 +339,7 @@ public class TestLocalApplicationRunner {
new StreamSpec("test-stream-1", "stream-1", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
assertFalse("Expected both of the latch ids to be different",
planIdBeforeShuffle.equals(getExecutionPlanId(shuffledStreamSpecs)));
}
@@ -419,6 +375,7 @@ public class TestLocalApplicationRunner {
new StreamSpec("test-stream-4", "stream-4", "testStream"),
new StreamSpec("test-stream-3", "stream-3", "testStream"));
+
assertFalse("Expected both of the latch ids to be different",
planIdBeforeShuffle.equals(getExecutionPlanId(updatedStreamSpecs)));
}