You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/23 20:02:45 UTC
[helix] 10/20: A few fixes on syntax
This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0fc74e7bf225f7b6983d930a919f5a69ef5dafad
Author: Molly Gao <mg...@mgao-mn1.linkedin.biz>
AuthorDate: Wed Feb 5 18:07:00 2020 -0800
A few fixes on syntax
---
.../apache/helix/lock/ZKHelixNonblockingLock.java | 94 +++++++++++++---------
.../helix/lock/ZKHelixNonblockingLockInfo.java | 38 ++++++++-
.../helix/lock/TestZKHelixNonblockingLock.java | 3 +-
3 files changed, 92 insertions(+), 43 deletions(-)
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
index 57ff9aa..0424177 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
@@ -19,14 +19,17 @@
package org.apache.helix.lock;
+import java.util.Date;
import java.util.UUID;
import com.google.common.annotations.VisibleForTesting;
import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.manager.zk.client.HelixZkClient;
@@ -34,6 +37,9 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.log4j.Logger;
import org.apache.zookeeper.data.Stat;
+import static org.apache.helix.lock.ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT;
+import static org.apache.helix.lock.ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_LONG;
+
/**
* Helix nonblocking lock implementation based on Zookeeper
@@ -44,8 +50,6 @@ public class ZKHelixNonblockingLock implements HelixLock {
private static final String LOCK_ROOT = "LOCKS";
private static final String PATH_DELIMITER = "/";
- private static final String UUID_FORMAT_REGEX =
- "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}";
private final String _lockPath;
private final String _userId;
private final long _timeout;
@@ -63,8 +67,8 @@ public class ZKHelixNonblockingLock implements HelixLock {
*/
public ZKHelixNonblockingLock(String clusterName, HelixConfigScope scope, String zkAddress,
Long timeout, String lockMsg, String userId) {
- this(PATH_DELIMITER + clusterName + PATH_DELIMITER + LOCK_ROOT + PATH_DELIMITER + scope,
- zkAddress, timeout, lockMsg, userId);
+ this(PATH_DELIMITER + String.join(PATH_DELIMITER, clusterName, LOCK_ROOT, scope.getZkPath()), zkAddress, timeout, lockMsg,
+ userId);
}
/**
@@ -77,15 +81,11 @@ public class ZKHelixNonblockingLock implements HelixLock {
*/
private ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
String userId) {
- HelixZkClient zkClient = new ZkClient(zkAddress);
_lockPath = lockPath;
_timeout = timeout;
_lockMsg = lockMsg;
- if (!userId.matches(UUID_FORMAT_REGEX)) {
- throw new IllegalArgumentException("The input user id is not a valid UUID.");
- }
_userId = userId;
- _baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient.getServers());
+ _baseDataAccessor = new ZkBaseDataAccessor<>(zkAddress);
}
@Override
@@ -95,16 +95,15 @@ public class ZKHelixNonblockingLock implements HelixLock {
ZNRecord lockInfo = new ZNRecord(_userId);
lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userId);
lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(), _lockMsg);
- long timeout;
- // If the input timeout value is the max value, set the expire time to max value
- if (_timeout == Long.MAX_VALUE) {
- timeout = _timeout;
+ long deadline;
+ // Prevent value overflow
+ if (_timeout > Long.MAX_VALUE - System.currentTimeMillis()) {
+ deadline = Long.MAX_VALUE;
} else {
- timeout = System.currentTimeMillis() + _timeout;
+ deadline = System.currentTimeMillis() + _timeout;
}
- lockInfo
- .setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), String.valueOf(timeout));
+ lockInfo.setLongField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), deadline);
// Try to create the lock node
boolean success = _baseDataAccessor.create(_lockPath, lockInfo, AccessOption.PERSISTENT);
@@ -113,9 +112,7 @@ public class ZKHelixNonblockingLock implements HelixLock {
if (!success) {
Stat stat = new Stat();
ZNRecord curLock = _baseDataAccessor.get(_lockPath, stat, AccessOption.PERSISTENT);
- long curTimeout =
- Long.parseLong(curLock.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name()));
- if (System.currentTimeMillis() >= curTimeout) {
+ if (hasTimedOut(curLock)) {
// set may fail when the znode version changes in between the get and set, meaning there are some changes in the lock
success =
_baseDataAccessor.set(_lockPath, lockInfo, stat.getVersion(), AccessOption.PERSISTENT);
@@ -126,22 +123,13 @@ public class ZKHelixNonblockingLock implements HelixLock {
@Override
public boolean releaseLock() {
- ZNRecord newLockInfo = new ZNRecord(_userId);
- newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(),
- ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT);
- newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(),
- ZKHelixNonblockingLockInfo.DEFAULT_MESSAGE_TEXT);
- newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(),
- ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_TEXT);
+ ZNRecord newLockInfo = new ZKHelixNonblockingLockInfo<>().toZNRecord();
LockUpdater updater = new LockUpdater(newLockInfo);
return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
}
@Override
public ZKHelixNonblockingLockInfo getLockInfo() {
- if (!_baseDataAccessor.exists(_lockPath, AccessOption.PERSISTENT)) {
- return new ZKHelixNonblockingLockInfo();
- }
ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
return new ZKHelixNonblockingLockInfo(curLockInfo);
}
@@ -149,9 +137,6 @@ public class ZKHelixNonblockingLock implements HelixLock {
@Override
public boolean isOwner() {
ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
- if (curLockInfo == null) {
- return false;
- }
return userIdMatches(curLockInfo) && !hasTimedOut(curLockInfo);
}
@@ -161,8 +146,12 @@ public class ZKHelixNonblockingLock implements HelixLock {
* @return return true if the lock has timed out, otherwise return false.
*/
private boolean hasTimedOut(ZNRecord record) {
- String timeoutStr = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name());
- return System.currentTimeMillis() >= Long.parseLong(timeoutStr);
+ if (record == null) {
+ return false;
+ }
+ long timeout = record
+ .getLongField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), DEFAULT_TIMEOUT_LONG);
+ return System.currentTimeMillis() >= timeout;
}
/**
@@ -171,18 +160,43 @@ public class ZKHelixNonblockingLock implements HelixLock {
* @return return true if the two ids match, otherwise return false.
*/
private boolean userIdMatches(ZNRecord record) {
+ if (record == null) {
+ return false;
+ }
String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
return ownerId.equals(_userId);
}
/**
- * Class that specifies how a lock node should be updated with another lock node for a lock owner only
+ * Check if the lock node has a owner id field in the lock information
+ * @param record Lock information in format of ZNRecord
+ * @return true if the lock has a owner id field that the ownership can be or not be timed out, otherwise false
+ */
+ private boolean hasOwnerField(ZNRecord record) {
+ if (record == null) {
+ return false;
+ }
+ String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
+ return ownerId != null && !ownerId.equals(DEFAULT_OWNER_TEXT);
+ }
+
+ /**
+ * Check if the lock node has a current owner
+ * @param record Lock information in format of ZNRecord
+ * @return true if the lock has a current owner that the ownership has not be timed out, otherwise false
+ */
+ private boolean hasNonExpiredOwner(ZNRecord record) {
+ return hasOwnerField(record) && !hasTimedOut(record);
+ }
+
+ /**
+ * Class that specifies how a lock node should be updated with another lock node
*/
private class LockUpdater implements DataUpdater<ZNRecord> {
final ZNRecord _record;
/**
- * Initialize a structure for lock owner to update a lock node value
+ * Initialize a structure for lock user to update a lock node value
* @param record the lock node value will be updated in ZNRecord format
*/
public LockUpdater(ZNRecord record) {
@@ -191,9 +205,15 @@ public class ZKHelixNonblockingLock implements HelixLock {
@Override
public ZNRecord update(ZNRecord current) {
- if (current != null && userIdMatches(current) && !hasTimedOut(current)) {
+ // If no one owns the lock, allow the update
+ // If the lock owner id matches user id, allow the update
+ if (!hasNonExpiredOwner(current) || userIdMatches(current)) {
return _record;
}
+ // For users who are not the lock owner and try to do an update on a lock that is held by someone else, exception thrown is to be caught by data accessor, and return false for the update
+ LOG.error(
+ "User " + _userId + "tried to update the lock at " + new Date(System.currentTimeMillis())
+ + ". Lock path: " + _lockPath);
throw new HelixException("User is not authorized to perform this operation.");
}
}
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
index 50daa03..6e62717 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
@@ -29,13 +29,17 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
public static final String DEFAULT_OWNER_TEXT = "";
public static final String DEFAULT_MESSAGE_TEXT = "";
- public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(-1);
+ public static final long DEFAULT_TIMEOUT_LONG = -1L;
+ public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(DEFAULT_TIMEOUT_LONG);
private Map<InfoKey, String> lockInfo;
public enum InfoKey {
OWNER, MESSAGE, TIMEOUT
}
+ /**
+ * Constructor of ZKHelixNonblockingLockInfo that set each field to default data
+ */
public ZKHelixNonblockingLockInfo() {
lockInfo = new HashMap<>();
lockInfo.put(InfoKey.OWNER, DEFAULT_OWNER_TEXT);
@@ -43,11 +47,22 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
lockInfo.put(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT);
}
+ /**
+ * Construct a ZKHelixNonblockingLockInfo using a ZNRecord format of data
+ * @param znRecord A ZNRecord that contains lock information in its simple fields
+ */
public ZKHelixNonblockingLockInfo(ZNRecord znRecord) {
this();
- lockInfo.put(InfoKey.OWNER, znRecord.getSimpleField(InfoKey.OWNER.name()));
- lockInfo.put(InfoKey.MESSAGE, znRecord.getSimpleField(InfoKey.MESSAGE.name()));
- lockInfo.put(InfoKey.TIMEOUT, znRecord.getSimpleField(InfoKey.TIMEOUT.name()));
+ if (znRecord == null) {
+ return;
+ }
+ Map<String, String> simpleFields = znRecord.getSimpleFields();
+ lockInfo
+ .put(InfoKey.OWNER, simpleFields.getOrDefault(InfoKey.OWNER.name(), DEFAULT_OWNER_TEXT));
+ lockInfo.put(InfoKey.MESSAGE,
+ simpleFields.getOrDefault(InfoKey.MESSAGE.name(), DEFAULT_MESSAGE_TEXT));
+ lockInfo.put(InfoKey.TIMEOUT,
+ simpleFields.getOrDefault(InfoKey.TIMEOUT.name(), DEFAULT_TIMEOUT_TEXT));
}
@Override
@@ -59,4 +74,19 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
public String getInfoValue(InfoKey infoKey) {
return lockInfo.get(infoKey);
}
+
+ /**
+ * Method to convert a ZKHelixNonblockingLockInfo to ZNRecord
+ * @return a ZNRecord format data contains lock information in its simple fields
+ */
+ public ZNRecord toZNRecord() {
+ ZNRecord znRecord = new ZNRecord("");
+ znRecord.setSimpleField(InfoKey.OWNER.name(),
+ lockInfo.getOrDefault(InfoKey.OWNER, DEFAULT_OWNER_TEXT));
+ znRecord.setSimpleField(InfoKey.MESSAGE.name(),
+ lockInfo.getOrDefault(InfoKey.MESSAGE, DEFAULT_MESSAGE_TEXT));
+ znRecord.setSimpleField(InfoKey.TIMEOUT.name(),
+ lockInfo.getOrDefault(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT));
+ return znRecord;
+ }
}
diff --git a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
index f2308c1..29f84ae 100644
--- a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
+++ b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
@@ -66,8 +66,7 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
_participantScope =
new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(_clusterName)
.forParticipant("localhost_12918").build();
-
- _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + _participantScope;
+ _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + _participantScope.getZkPath();
_lock = new ZKHelixNonblockingLock(_clusterName, _participantScope, ZK_ADDR, Long.MAX_VALUE,
_lockMessage, _userId);
}