You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/14 02:30:21 UTC
[helix] 08/19: Fixed logic of release and isOwner
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch distributed-lock
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 32f3b86a4da664b637bd19b13e624789c5dd3a25
Author: Molly Gao <mg...@mgao-mn1.linkedin.biz>
AuthorDate: Tue Feb 4 10:41:13 2020 -0800
Fixed logic of release and isOwner
---
.../main/java/org/apache/helix/lock/HelixLock.java | 2 +-
.../main/java/org/apache/helix/lock/LockInfo.java | 9 +-
.../apache/helix/lock/ZKHelixNonblockingLock.java | 136 ++++++++++++++-------
.../helix/lock/ZKHelixNonblockingLockInfo.java | 45 +++----
.../helix/lock/TestZKHelixNonblockingLock.java | 106 ++++++----------
5 files changed, 150 insertions(+), 148 deletions(-)
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
index 71b1ca6..2d7e318 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -42,7 +42,7 @@ public interface HelixLock {
* Retrieve the lock information, e.g. lock timeout, lock message, etc.
* @return lock metadata information
*/
- <T> LockInfo<T> getLockInfo();
+ <K, V> LockInfo<K, V> getLockInfo();
/**
* If the user is current lock owner
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
index 30322bb..afd4c00 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
@@ -24,9 +24,10 @@ import java.util.Map;
/**
* Generic interface for a map contains the Helix lock information
- * @param <T> The type of the LockInfo value
+ * @param <K> The type of the LockInfo key
+ * @param <V> the type of the LockInfo value
*/
-public interface LockInfo<T> {
+public interface LockInfo<K, V> {
//TODO: add specific setter and getter for any field that is determined to be universal for all implementations of HelixLock
@@ -35,12 +36,12 @@ public interface LockInfo<T> {
* @param infoKey the key of the LockInfo field
* @param infoValue the value of the LockInfo field
*/
- void setInfoValue(String infoKey, T infoValue);
+ void setInfoValue(K infoKey, V infoValue);
/**
* Get the value of a field in LockInfo
* @param infoKey the key of the LockInfo field
* @return the value of the field or null if this key does not exist
*/
- T getInfoValue(String infoKey);
+ V getInfoValue(K infoKey);
}
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
index 5be84bb..44b5aa9 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
@@ -19,20 +19,18 @@
package org.apache.helix.lock;
-import java.util.HashMap;
-import java.util.Map;
import java.util.UUID;
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.DataUpdater;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.HelixZkClient;
import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.util.ZNRecordUtil;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
@@ -45,8 +43,11 @@ public class ZKHelixNonblockingLock implements HelixLock {
private static final Logger LOG = Logger.getLogger(ZKHelixNonblockingLock.class);
private static final String LOCK_ROOT = "LOCKS";
+ private static final String PATH_DELIMITER = "/";
+ private static final String UUID_FORMAT_REGEX =
+ "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}";
private final String _lockPath;
- private final String _userID;
+ private final String _userId;
private final long _timeout;
private final String _lockMsg;
private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
@@ -58,10 +59,12 @@ public class ZKHelixNonblockingLock implements HelixLock {
* @param zkAddress the zk address the cluster connects to
* @param timeout the timeout period of the lcok
* @param lockMsg the reason for having this lock
+ * @param userId a universal unique userId for lock owner identity
*/
public ZKHelixNonblockingLock(String clusterName, HelixConfigScope scope, String zkAddress,
- Long timeout, String lockMsg, String userID) {
- this("/" + clusterName + '/' + LOCK_ROOT + '/' + scope, zkAddress, timeout, lockMsg, userID);
+ Long timeout, String lockMsg, String userId) {
+ this(PATH_DELIMITER + clusterName + PATH_DELIMITER + LOCK_ROOT + PATH_DELIMITER + scope,
+ zkAddress, timeout, lockMsg, userId);
}
/**
@@ -70,29 +73,36 @@ public class ZKHelixNonblockingLock implements HelixLock {
* @param zkAddress the zk address of the cluster
* @param timeout the timeout period of the lcok
* @param lockMsg the reason for having this lock
+ * @param userId a universal unique userId for lock owner identity
*/
- public ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
- String userID) {
+ private ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
+ String userId) {
HelixZkClient zkClient = new ZkClient(zkAddress);
_lockPath = lockPath;
_timeout = timeout;
_lockMsg = lockMsg;
- _userID = userID;
+ if (!userId.matches(UUID_FORMAT_REGEX)) {
+ throw new IllegalArgumentException("The input user id is not a valid UUID.");
+ }
+ _userId = userId;
_baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient.getServers());
}
@Override
- /**
- * Blocking call to acquire a lock
- * @return true if the lock was successfully acquired,
- * false if the lock could not be acquired
- */ public boolean acquireLock() {
+ public boolean acquireLock() {
// Set lock information fields
- ZNRecord lockInfo = new ZNRecord(_userID);
- lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userID);
+ ZNRecord lockInfo = new ZNRecord(_userId);
+ lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userId);
lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(), _lockMsg);
- long timeout = System.currentTimeMillis() + _timeout;
+ long timeout;
+
+ // If the input timeout value is the max value, set the expire time to max value
+ if (_timeout == Long.MAX_VALUE) {
+ timeout = _timeout;
+ } else {
+ timeout = System.currentTimeMillis() + _timeout;
+ }
lockInfo
.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), String.valueOf(timeout));
@@ -106,6 +116,7 @@ public class ZKHelixNonblockingLock implements HelixLock {
long curTimeout =
Long.parseLong(curLock.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name()));
if (System.currentTimeMillis() >= curTimeout) {
+ // set may fail when the znode version changes in between the get and set, meaning there are some changes in the lock
success =
_baseDataAccessor.set(_lockPath, lockInfo, stat.getVersion(), AccessOption.PERSISTENT);
}
@@ -114,46 +125,79 @@ public class ZKHelixNonblockingLock implements HelixLock {
}
@Override
- /**
- * Blocking call to release a lock
- * @return true if the lock was successfully released,
- * false if the locked is not locked or is not locked by the user,
- * or the lock could not be released
- */ public boolean releaseLock() {
- if (isOwner()) {
- return _baseDataAccessor.remove(_lockPath, AccessOption.PERSISTENT);
- }
- return false;
+ public boolean releaseLock() {
+ ZNRecord newLockInfo = new ZNRecord(_userId);
+ newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(),
+ ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT);
+ newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(),
+ ZKHelixNonblockingLockInfo.DEFAULT_MESSAGE_TEXT);
+ newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(),
+ ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_TEXT);
+ LockUpdater updater = new LockUpdater(newLockInfo);
+ return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
}
@Override
- /**
- * Retrieve the lock information, e.g. lock timeout, lock message, etc.
- * @return lock metadata information, return null if there is no lock node for the path provided
- */ public LockInfo<String> getLockInfo() {
+ public ZKHelixNonblockingLockInfo getLockInfo() {
if (!_baseDataAccessor.exists(_lockPath, AccessOption.PERSISTENT)) {
- return null;
+ return new ZKHelixNonblockingLockInfo();
}
- ZKHelixNonblockingLockInfo<String> lockInfo = new ZKHelixNonblockingLockInfo<>();
ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
- lockInfo.setLockInfoFields(curLockInfo);
- return lockInfo;
+ return new ZKHelixNonblockingLockInfo(curLockInfo);
}
@Override
- /**
- * Check if the user is current lock owner
- * @return true if the user is the lock owner,
- * false if the user is not the lock owner or the lock doesn't have a owner
- */ public boolean isOwner() {
+ public boolean isOwner() {
ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
if (curLockInfo == null) {
return false;
}
- String ownerID = curLockInfo.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
- if (ownerID == null) {
- return false;
+ if (userIdMatches(curLockInfo)) {
+ return !hasTimedOut(curLockInfo);
+ }
+ return false;
+ }
+
+ /**
+ * Check if a lock has timed out
+ * @param record the current lock information in ZNRecord format
+ * @return return true if the lock has timed out, otherwise return false.
+ */
+ private boolean hasTimedOut(ZNRecord record) {
+ String timeoutStr = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name());
+ return System.currentTimeMillis() >= Long.parseLong(timeoutStr);
+ }
+
+ /**
+ * Check if the current user Id matches with the owner Id in a lock info
+ * @param record the lock information in ZNRecord format
+ * @return return true if the two ids match, otherwise return false.
+ */
+ private boolean userIdMatches(ZNRecord record) {
+ String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
+ return ownerId.equals(_userId);
+ }
+
+ /**
+ * Class that specifies how a lock node should be updated with another lock node for a lock owner only
+ */
+ private class LockUpdater implements DataUpdater<ZNRecord> {
+ final ZNRecord _record;
+
+ /**
+ * Initialize a structure for lock owner to update a lock node value
+ * @param record the lock node value will be updated in ZNRecord format
+ */
+ public LockUpdater(ZNRecord record) {
+ _record = record;
+ }
+
+ @Override
+ public ZNRecord update(ZNRecord current) {
+ if (current != null && userIdMatches(current) && !hasTimedOut(current)) {
+ return _record;
+ }
+ throw new HelixException("User is not authorized to perform this operation.");
}
- return ownerID.equals(_userID);
}
}
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
index fb86187..50daa03 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
@@ -25,45 +25,38 @@ import java.util.Map;
import org.apache.helix.ZNRecord;
-public class ZKHelixNonblockingLockInfo<T extends String> implements LockInfo<T> {
+public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.InfoKey, V extends String> implements LockInfo<K, V> {
- private Map<String, String> lockInfo;
+ public static final String DEFAULT_OWNER_TEXT = "";
+ public static final String DEFAULT_MESSAGE_TEXT = "";
+ public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(-1);
+ private Map<InfoKey, String> lockInfo;
- enum InfoKey {
+ public enum InfoKey {
OWNER, MESSAGE, TIMEOUT
}
public ZKHelixNonblockingLockInfo() {
lockInfo = new HashMap<>();
+ lockInfo.put(InfoKey.OWNER, DEFAULT_OWNER_TEXT);
+ lockInfo.put(InfoKey.MESSAGE, DEFAULT_MESSAGE_TEXT);
+ lockInfo.put(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT);
}
- @Override
- /**
- * Create a single filed of LockInfo, or update the value of the field if it already exists
- * @param infoKey the key of the LockInfo field
- * @param infoValue the value of the LockInfo field
- */ public void setInfoValue(String infoKey, String infoValue) {
- lockInfo.put(infoKey, infoValue);
+ public ZKHelixNonblockingLockInfo(ZNRecord znRecord) {
+ this();
+ lockInfo.put(InfoKey.OWNER, znRecord.getSimpleField(InfoKey.OWNER.name()));
+ lockInfo.put(InfoKey.MESSAGE, znRecord.getSimpleField(InfoKey.MESSAGE.name()));
+ lockInfo.put(InfoKey.TIMEOUT, znRecord.getSimpleField(InfoKey.TIMEOUT.name()));
}
@Override
- /**
- * Get the value of a field in LockInfo
- * @param infoKey the key of the LockInfo field
- * @return the value of the field or null if this key does not exist
- */ public T getInfoValue(String infoKey) {
- return (T) lockInfo.get(infoKey);
+ public void setInfoValue(InfoKey infoKey, String infoValue) {
+ lockInfo.put(infoKey, infoValue);
}
- /**
- * Update the lock info with information in a ZNRecord
- * @param record Information about the lock that stored as ZNRecord format
- */
- public void setLockInfoFields(ZNRecord record) {
- if (record == null) {
- return;
- }
- Map<String, String> recordSimpleFields = record.getSimpleFields();
- lockInfo.putAll(recordSimpleFields);
+ @Override
+ public String getInfoValue(InfoKey infoKey) {
+ return lockInfo.get(infoKey);
}
}
diff --git a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
index 95c41ac..37daf7b 100644
--- a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
+++ b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
@@ -31,16 +31,19 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.zookeeper.CreateMode;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
- private final String _lockPath = "/testLockPath";
private final String _className = TestHelper.getTestClassName();
private final String _methodName = TestHelper.getTestMethodName();
private final String _clusterName = _className + "_" + _methodName;
private final String _lockMessage = "Test";
+ private String _lockPath;
+ private ZKHelixNonblockingLock _lock;
+ private String _userId;
@BeforeClass
public void beforeClass() throws Exception {
@@ -49,79 +52,47 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3,
"MasterSlave", true);
- }
-
- @Test
- public void testAcquireLockWithParticipantScope() {
-
- // Initialize lock with participant config scope
+ _userId = UUID.randomUUID().toString();
HelixConfigScope participantScope =
new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(_clusterName)
.forParticipant("localhost_12918").build();
- String userID = UUID.randomUUID().toString();
-
- ZKHelixNonblockingLock lock =
- new ZKHelixNonblockingLock(_clusterName, participantScope, ZK_ADDR, Long.MAX_VALUE,
- _lockMessage, userID);
+ _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + participantScope;
+ _lock = new ZKHelixNonblockingLock(_clusterName, participantScope, ZK_ADDR, Long.MAX_VALUE,
+ _lockMessage, _userId);
- // Acquire lock
- lock.acquireLock();
- String lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + participantScope;
- Assert.assertTrue(_gZkClient.exists(lockPath));
-
- // Get lock information
- LockInfo<String> record = lock.getLockInfo();
- Assert
- .assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name()), userID);
- Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name()),
- _lockMessage);
-
- // Check if the user is lock owner
- Assert.assertTrue(lock.isOwner());
+ }
- // Release lock
- lock.releaseLock();
- Assert.assertFalse(_gZkClient.exists(lockPath));
+ @BeforeMethod
+ public void beforeMethod() {
+ _gZkClient.delete(_lockPath);
+ Assert.assertFalse(_gZkClient.exists(_lockPath));
}
@Test
- public void testAcquireLockWithUserProvidedPath() {
-
- // Initialize lock with user provided path
- String userID = UUID.randomUUID().toString();
-
- ZKHelixNonblockingLock lock =
- new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, Long.MAX_VALUE, _lockMessage, userID);
+ public void testAcquireLock() {
- //Acquire lock
- lock.acquireLock();
+ // Acquire lock
+ _lock.acquireLock();
Assert.assertTrue(_gZkClient.exists(_lockPath));
// Get lock information
- LockInfo<String> record = lock.getLockInfo();
- Assert
- .assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name()), userID);
- Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name()),
+ LockInfo<ZKHelixNonblockingLockInfo.InfoKey, String> record = _lock.getLockInfo();
+ Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER), _userId);
+ Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE),
_lockMessage);
// Check if the user is lock owner
- Assert.assertTrue(lock.isOwner());
+ Assert.assertTrue(_lock.isOwner());
// Release lock
- lock.releaseLock();
- Assert.assertFalse(_gZkClient.exists(_lockPath));
+ _lock.releaseLock();
+ Assert.assertFalse(_lock.isOwner());
}
@Test
public void testAcquireLockWhenExistingLockNotExpired() {
- // Initialize lock with user provided path
- String ownerID = UUID.randomUUID().toString();
-
- ZKHelixNonblockingLock lock =
- new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, 0L, _lockMessage, ownerID);
-
// Fake condition when the lock owner is not current user
String fakeUserID = UUID.randomUUID().toString();
ZNRecord fakeRecord = new ZNRecord(fakeUserID);
@@ -131,28 +102,19 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
_gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
// Check if the user is lock owner
- Assert.assertFalse(lock.isOwner());
+ Assert.assertFalse(_lock.isOwner());
// Acquire lock
- Assert.assertFalse(lock.acquireLock());
- Assert.assertFalse(lock.isOwner());
+ Assert.assertFalse(_lock.acquireLock());
+ Assert.assertFalse(_lock.isOwner());
// Release lock
- Assert.assertFalse(lock.releaseLock());
- Assert.assertTrue(_gZkClient.exists(_lockPath));
-
- _gZkClient.delete(_lockPath);
+ Assert.assertFalse(_lock.releaseLock());
}
@Test
public void testAcquireLockWhenExistingLockExpired() {
- // Initialize lock with user provided path
- String ownerID = UUID.randomUUID().toString();
-
- ZKHelixNonblockingLock lock =
- new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, Long.MAX_VALUE, _lockMessage, ownerID);
-
// Fake condition when the current lock already expired
String fakeUserID = UUID.randomUUID().toString();
ZNRecord fakeRecord = new ZNRecord(fakeUserID);
@@ -162,15 +124,17 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
_gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
// Acquire lock
- Assert.assertTrue(lock.acquireLock());
- Assert.assertTrue(_gZkClient.exists(_lockPath));
-
- // Check if the current user is the lock owner
- Assert.assertTrue(lock.isOwner());
+ Assert.assertTrue(_lock.acquireLock());
+ Assert.assertTrue(_lock.isOwner());
// Release lock
- Assert.assertTrue(lock.releaseLock());
- Assert.assertFalse(_gZkClient.exists(_lockPath));
+ Assert.assertTrue(_lock.releaseLock());
+ Assert.assertFalse(_lock.isOwner());
+ }
+
+ @Test
+ public void testSimultaneousAcquire() {
+
}
}