You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/14 02:30:23 UTC

[helix] 10/19: A few fixes on syntax

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch distributed-lock
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 6868eb61d5aadf02b3c934f5f903ad5e3f4779fa
Author: Molly Gao <mg...@mgao-mn1.linkedin.biz>
AuthorDate: Wed Feb 5 18:07:00 2020 -0800

    A few fixes on syntax
---
 .../apache/helix/lock/ZKHelixNonblockingLock.java  | 94 +++++++++++++---------
 .../helix/lock/ZKHelixNonblockingLockInfo.java     | 38 ++++++++-
 .../helix/lock/TestZKHelixNonblockingLock.java     |  3 +-
 3 files changed, 92 insertions(+), 43 deletions(-)

diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
index 57ff9aa..0424177 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
@@ -19,14 +19,17 @@
 
 package org.apache.helix.lock;
 
+import java.util.Date;
 import java.util.UUID;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.I0Itec.zkclient.DataUpdater;
+import org.I0Itec.zkclient.exception.ZkNoNodeException;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
@@ -34,6 +37,9 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
+import static org.apache.helix.lock.ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT;
+import static org.apache.helix.lock.ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_LONG;
+
 
 /**
  * Helix nonblocking lock implementation based on Zookeeper
@@ -44,8 +50,6 @@ public class ZKHelixNonblockingLock implements HelixLock {
 
   private static final String LOCK_ROOT = "LOCKS";
   private static final String PATH_DELIMITER = "/";
-  private static final String UUID_FORMAT_REGEX =
-      "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}";
   private final String _lockPath;
   private final String _userId;
   private final long _timeout;
@@ -63,8 +67,8 @@ public class ZKHelixNonblockingLock implements HelixLock {
    */
   public ZKHelixNonblockingLock(String clusterName, HelixConfigScope scope, String zkAddress,
       Long timeout, String lockMsg, String userId) {
-    this(PATH_DELIMITER + clusterName + PATH_DELIMITER + LOCK_ROOT + PATH_DELIMITER + scope,
-        zkAddress, timeout, lockMsg, userId);
+    this(PATH_DELIMITER +  String.join(PATH_DELIMITER, clusterName, LOCK_ROOT, scope.getZkPath()), zkAddress, timeout, lockMsg,
+        userId);
   }
 
   /**
@@ -77,15 +81,11 @@ public class ZKHelixNonblockingLock implements HelixLock {
    */
   private ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
       String userId) {
-    HelixZkClient zkClient = new ZkClient(zkAddress);
     _lockPath = lockPath;
     _timeout = timeout;
     _lockMsg = lockMsg;
-    if (!userId.matches(UUID_FORMAT_REGEX)) {
-      throw new IllegalArgumentException("The input user id is not a valid UUID.");
-    }
     _userId = userId;
-    _baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient.getServers());
+    _baseDataAccessor = new ZkBaseDataAccessor<>(zkAddress);
   }
 
   @Override
@@ -95,16 +95,15 @@ public class ZKHelixNonblockingLock implements HelixLock {
     ZNRecord lockInfo = new ZNRecord(_userId);
     lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userId);
     lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(), _lockMsg);
-    long timeout;
 
-    // If the input timeout value is the max value, set the expire time to max value
-    if (_timeout == Long.MAX_VALUE) {
-      timeout = _timeout;
+    long deadline;
+    // Prevent value overflow
+    if (_timeout > Long.MAX_VALUE - System.currentTimeMillis()) {
+      deadline = Long.MAX_VALUE;
     } else {
-      timeout = System.currentTimeMillis() + _timeout;
+      deadline = System.currentTimeMillis() + _timeout;
     }
-    lockInfo
-        .setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), String.valueOf(timeout));
+    lockInfo.setLongField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), deadline);
 
     // Try to create the lock node
     boolean success = _baseDataAccessor.create(_lockPath, lockInfo, AccessOption.PERSISTENT);
@@ -113,9 +112,7 @@ public class ZKHelixNonblockingLock implements HelixLock {
     if (!success) {
       Stat stat = new Stat();
       ZNRecord curLock = _baseDataAccessor.get(_lockPath, stat, AccessOption.PERSISTENT);
-      long curTimeout =
-          Long.parseLong(curLock.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name()));
-      if (System.currentTimeMillis() >= curTimeout) {
+      if (hasTimedOut(curLock)) {
         // set may fail when the znode version changes in between the get and set, meaning there are some changes in the lock
         success =
             _baseDataAccessor.set(_lockPath, lockInfo, stat.getVersion(), AccessOption.PERSISTENT);
@@ -126,22 +123,13 @@ public class ZKHelixNonblockingLock implements HelixLock {
 
   @Override
   public boolean releaseLock() {
-    ZNRecord newLockInfo = new ZNRecord(_userId);
-    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(),
-        ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT);
-    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(),
-        ZKHelixNonblockingLockInfo.DEFAULT_MESSAGE_TEXT);
-    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(),
-        ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_TEXT);
+    ZNRecord newLockInfo = new ZKHelixNonblockingLockInfo<>().toZNRecord();
     LockUpdater updater = new LockUpdater(newLockInfo);
     return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
   }
 
   @Override
   public ZKHelixNonblockingLockInfo getLockInfo() {
-    if (!_baseDataAccessor.exists(_lockPath, AccessOption.PERSISTENT)) {
-      return new ZKHelixNonblockingLockInfo();
-    }
     ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
     return new ZKHelixNonblockingLockInfo(curLockInfo);
   }
@@ -149,9 +137,6 @@ public class ZKHelixNonblockingLock implements HelixLock {
   @Override
   public boolean isOwner() {
     ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
-    if (curLockInfo == null) {
-      return false;
-    }
     return userIdMatches(curLockInfo) && !hasTimedOut(curLockInfo);
   }
 
@@ -161,8 +146,12 @@ public class ZKHelixNonblockingLock implements HelixLock {
    * @return return true if the lock has timed out, otherwise return false.
    */
   private boolean hasTimedOut(ZNRecord record) {
-    String timeoutStr = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name());
-    return System.currentTimeMillis() >= Long.parseLong(timeoutStr);
+    if (record == null) {
+      return false;
+    }
+    long timeout = record
+        .getLongField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), DEFAULT_TIMEOUT_LONG);
+    return System.currentTimeMillis() >= timeout;
   }
 
   /**
@@ -171,18 +160,43 @@ public class ZKHelixNonblockingLock implements HelixLock {
    * @return return true if the two ids match, otherwise return false.
    */
   private boolean userIdMatches(ZNRecord record) {
+    if (record == null) {
+      return false;
+    }
     String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
     return ownerId.equals(_userId);
   }
 
   /**
-   * Class that specifies how a lock node should be updated with another lock node for a lock owner only
+   * Check if the lock node has a owner id field in the lock information
+   * @param record Lock information in format of ZNRecord
+   * @return true if the lock has a owner id field that the ownership can be or not be timed out, otherwise false
+   */
+  private boolean hasOwnerField(ZNRecord record) {
+    if (record == null) {
+      return false;
+    }
+    String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
+    return ownerId != null && !ownerId.equals(DEFAULT_OWNER_TEXT);
+  }
+
+  /**
+   * Check if the lock node has a current owner
+   * @param record Lock information in format of ZNRecord
+   * @return true if the lock has a current owner that the ownership has not be timed out, otherwise false
+   */
+  private boolean hasNonExpiredOwner(ZNRecord record) {
+    return hasOwnerField(record) && !hasTimedOut(record);
+  }
+
+  /**
+   * Class that specifies how a lock node should be updated with another lock node
    */
   private class LockUpdater implements DataUpdater<ZNRecord> {
     final ZNRecord _record;
 
     /**
-     * Initialize a structure for lock owner to update a lock node value
+     * Initialize a structure for lock user to update a lock node value
      * @param record the lock node value will be updated in ZNRecord format
      */
     public LockUpdater(ZNRecord record) {
@@ -191,9 +205,15 @@ public class ZKHelixNonblockingLock implements HelixLock {
 
     @Override
     public ZNRecord update(ZNRecord current) {
-      if (current != null && userIdMatches(current) && !hasTimedOut(current)) {
+      // If no one owns the lock, allow the update
+      // If the lock owner id matches user id, allow the update
+      if (!hasNonExpiredOwner(current) || userIdMatches(current)) {
         return _record;
       }
+      // For users who are not the lock owner and try to do an update on a lock that is held by someone else, exception thrown is to be caught by data accessor, and return false for the update
+      LOG.error(
+          "User " + _userId + "tried to update the lock at " + new Date(System.currentTimeMillis())
+              + ". Lock path: " + _lockPath);
       throw new HelixException("User is not authorized to perform this operation.");
     }
   }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
index 50daa03..6e62717 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
@@ -29,13 +29,17 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
 
   public static final String DEFAULT_OWNER_TEXT = "";
   public static final String DEFAULT_MESSAGE_TEXT = "";
-  public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(-1);
+  public static final long DEFAULT_TIMEOUT_LONG = -1L;
+  public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(DEFAULT_TIMEOUT_LONG);
   private Map<InfoKey, String> lockInfo;
 
   public enum InfoKey {
     OWNER, MESSAGE, TIMEOUT
   }
 
+  /**
+   * Constructor of ZKHelixNonblockingLockInfo that set each field to default data
+   */
   public ZKHelixNonblockingLockInfo() {
     lockInfo = new HashMap<>();
     lockInfo.put(InfoKey.OWNER, DEFAULT_OWNER_TEXT);
@@ -43,11 +47,22 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
     lockInfo.put(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT);
   }
 
+  /**
+   * Construct a ZKHelixNonblockingLockInfo using a ZNRecord format of data
+   * @param znRecord A ZNRecord that contains lock information in its simple fields
+   */
   public ZKHelixNonblockingLockInfo(ZNRecord znRecord) {
     this();
-    lockInfo.put(InfoKey.OWNER, znRecord.getSimpleField(InfoKey.OWNER.name()));
-    lockInfo.put(InfoKey.MESSAGE, znRecord.getSimpleField(InfoKey.MESSAGE.name()));
-    lockInfo.put(InfoKey.TIMEOUT, znRecord.getSimpleField(InfoKey.TIMEOUT.name()));
+    if (znRecord == null) {
+      return;
+    }
+    Map<String, String> simpleFields = znRecord.getSimpleFields();
+    lockInfo
+        .put(InfoKey.OWNER, simpleFields.getOrDefault(InfoKey.OWNER.name(), DEFAULT_OWNER_TEXT));
+    lockInfo.put(InfoKey.MESSAGE,
+        simpleFields.getOrDefault(InfoKey.MESSAGE.name(), DEFAULT_MESSAGE_TEXT));
+    lockInfo.put(InfoKey.TIMEOUT,
+        simpleFields.getOrDefault(InfoKey.TIMEOUT.name(), DEFAULT_TIMEOUT_TEXT));
   }
 
   @Override
@@ -59,4 +74,19 @@ public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.Inf
   public String getInfoValue(InfoKey infoKey) {
     return lockInfo.get(infoKey);
   }
+
+  /**
+   * Method to convert a ZKHelixNonblockingLockInfo to ZNRecord
+   * @return a ZNRecord format data contains lock information in its simple fields
+   */
+  public ZNRecord toZNRecord() {
+    ZNRecord znRecord = new ZNRecord("");
+    znRecord.setSimpleField(InfoKey.OWNER.name(),
+        lockInfo.getOrDefault(InfoKey.OWNER, DEFAULT_OWNER_TEXT));
+    znRecord.setSimpleField(InfoKey.MESSAGE.name(),
+        lockInfo.getOrDefault(InfoKey.MESSAGE, DEFAULT_MESSAGE_TEXT));
+    znRecord.setSimpleField(InfoKey.TIMEOUT.name(),
+        lockInfo.getOrDefault(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT));
+    return znRecord;
+  }
 }
diff --git a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
index f2308c1..29f84ae 100644
--- a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
+++ b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
@@ -66,8 +66,7 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
     _participantScope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(_clusterName)
             .forParticipant("localhost_12918").build();
-
-    _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + _participantScope;
+    _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + _participantScope.getZkPath();
     _lock = new ZKHelixNonblockingLock(_clusterName, _participantScope, ZK_ADDR, Long.MAX_VALUE,
         _lockMessage, _userId);
   }