You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2020/04/14 02:30:21 UTC

[helix] 08/19: Fixed logic of release and isOwner

This is an automated email from the ASF dual-hosted git repository.

jxue pushed a commit to branch distributed-lock
in repository https://gitbox.apache.org/repos/asf/helix.git

commit 32f3b86a4da664b637bd19b13e624789c5dd3a25
Author: Molly Gao <mg...@mgao-mn1.linkedin.biz>
AuthorDate: Tue Feb 4 10:41:13 2020 -0800

    Fixed logic of release and isOwner
---
 .../main/java/org/apache/helix/lock/HelixLock.java |   2 +-
 .../main/java/org/apache/helix/lock/LockInfo.java  |   9 +-
 .../apache/helix/lock/ZKHelixNonblockingLock.java  | 136 ++++++++++++++-------
 .../helix/lock/ZKHelixNonblockingLockInfo.java     |  45 +++----
 .../helix/lock/TestZKHelixNonblockingLock.java     | 106 ++++++----------
 5 files changed, 150 insertions(+), 148 deletions(-)

diff --git a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java b/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
index 71b1ca6..2d7e318 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/HelixLock.java
@@ -42,7 +42,7 @@ public interface HelixLock {
    * Retrieve the lock information, e.g. lock timeout, lock message, etc.
    * @return lock metadata information
    */
-  <T> LockInfo<T> getLockInfo();
+  <K, V> LockInfo<K, V> getLockInfo();
 
   /**
    * If the user is current lock owner
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
index 30322bb..afd4c00 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
@@ -24,9 +24,10 @@ import java.util.Map;
 
 /**
  * Generic interface for a map contains the Helix lock information
- * @param <T> The type of the LockInfo value
+ * @param <K> The type of the LockInfo key
+ * @param <V> the type of the LockInfo value
  */
-public interface LockInfo<T> {
+public interface LockInfo<K, V> {
 
   //TODO: add specific setter and getter for any field that is determined to be universal for all implementations of HelixLock
 
@@ -35,12 +36,12 @@ public interface LockInfo<T> {
    * @param infoKey the key of the LockInfo field
    * @param infoValue the value of the LockInfo field
    */
-  void setInfoValue(String infoKey, T infoValue);
+  void setInfoValue(K infoKey, V infoValue);
 
   /**
    * Get the value of a field in LockInfo
    * @param infoKey the key of the LockInfo field
    * @return the value of the field or null if this key does not exist
    */
-  T getInfoValue(String infoKey);
+  V getInfoValue(K infoKey);
 }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
index 5be84bb..44b5aa9 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLock.java
@@ -19,20 +19,18 @@
 
 package org.apache.helix.lock;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.I0Itec.zkclient.DataUpdater;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.ZNRecord;
-import org.apache.helix.ZNRecordUpdater;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.manager.zk.client.HelixZkClient;
 import org.apache.helix.model.HelixConfigScope;
-import org.apache.helix.util.ZNRecordUtil;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.data.Stat;
 
@@ -45,8 +43,11 @@ public class ZKHelixNonblockingLock implements HelixLock {
   private static final Logger LOG = Logger.getLogger(ZKHelixNonblockingLock.class);
 
   private static final String LOCK_ROOT = "LOCKS";
+  private static final String PATH_DELIMITER = "/";
+  private static final String UUID_FORMAT_REGEX =
+      "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}";
   private final String _lockPath;
-  private final String _userID;
+  private final String _userId;
   private final long _timeout;
   private final String _lockMsg;
   private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
@@ -58,10 +59,12 @@ public class ZKHelixNonblockingLock implements HelixLock {
    * @param zkAddress the zk address the cluster connects to
    * @param timeout the timeout period of the lcok
    * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
    */
   public ZKHelixNonblockingLock(String clusterName, HelixConfigScope scope, String zkAddress,
-      Long timeout, String lockMsg, String userID) {
-    this("/" + clusterName + '/' + LOCK_ROOT + '/' + scope, zkAddress, timeout, lockMsg, userID);
+      Long timeout, String lockMsg, String userId) {
+    this(PATH_DELIMITER + clusterName + PATH_DELIMITER + LOCK_ROOT + PATH_DELIMITER + scope,
+        zkAddress, timeout, lockMsg, userId);
   }
 
   /**
@@ -70,29 +73,36 @@ public class ZKHelixNonblockingLock implements HelixLock {
    * @param zkAddress the zk address of the cluster
    * @param timeout the timeout period of the lcok
    * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
    */
-  public ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
-      String userID) {
+  private ZKHelixNonblockingLock(String lockPath, String zkAddress, Long timeout, String lockMsg,
+      String userId) {
     HelixZkClient zkClient = new ZkClient(zkAddress);
     _lockPath = lockPath;
     _timeout = timeout;
     _lockMsg = lockMsg;
-    _userID = userID;
+    if (!userId.matches(UUID_FORMAT_REGEX)) {
+      throw new IllegalArgumentException("The input user id is not a valid UUID.");
+    }
+    _userId = userId;
     _baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient.getServers());
   }
 
   @Override
-  /**
-   * Blocking call to acquire a lock
-   * @return true if the lock was successfully acquired,
-   * false if the lock could not be acquired
-   */ public boolean acquireLock() {
+  public boolean acquireLock() {
 
     // Set lock information fields
-    ZNRecord lockInfo = new ZNRecord(_userID);
-    lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userID);
+    ZNRecord lockInfo = new ZNRecord(_userId);
+    lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(), _userId);
     lockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(), _lockMsg);
-    long timeout = System.currentTimeMillis() + _timeout;
+    long timeout;
+
+    // If the input timeout value is the max value, set the expire time to max value
+    if (_timeout == Long.MAX_VALUE) {
+      timeout = _timeout;
+    } else {
+      timeout = System.currentTimeMillis() + _timeout;
+    }
     lockInfo
         .setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(), String.valueOf(timeout));
 
@@ -106,6 +116,7 @@ public class ZKHelixNonblockingLock implements HelixLock {
       long curTimeout =
           Long.parseLong(curLock.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name()));
       if (System.currentTimeMillis() >= curTimeout) {
+        // set may fail when the znode version changes in between the get and set, meaning there are some changes in the lock
         success =
             _baseDataAccessor.set(_lockPath, lockInfo, stat.getVersion(), AccessOption.PERSISTENT);
       }
@@ -114,46 +125,79 @@ public class ZKHelixNonblockingLock implements HelixLock {
   }
 
   @Override
-  /**
-   * Blocking call to release a lock
-   * @return true if the lock was successfully released,
-   * false if the locked is not locked or is not locked by the user,
-   * or the lock could not be released
-   */ public boolean releaseLock() {
-    if (isOwner()) {
-      return _baseDataAccessor.remove(_lockPath, AccessOption.PERSISTENT);
-    }
-    return false;
+  public boolean releaseLock() {
+    ZNRecord newLockInfo = new ZNRecord(_userId);
+    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name(),
+        ZKHelixNonblockingLockInfo.DEFAULT_OWNER_TEXT);
+    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name(),
+        ZKHelixNonblockingLockInfo.DEFAULT_MESSAGE_TEXT);
+    newLockInfo.setSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name(),
+        ZKHelixNonblockingLockInfo.DEFAULT_TIMEOUT_TEXT);
+    LockUpdater updater = new LockUpdater(newLockInfo);
+    return _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
   }
 
   @Override
-  /**
-   * Retrieve the lock information, e.g. lock timeout, lock message, etc.
-   * @return lock metadata information, return null if there is no lock node for the path provided
-   */ public LockInfo<String> getLockInfo() {
+  public ZKHelixNonblockingLockInfo getLockInfo() {
     if (!_baseDataAccessor.exists(_lockPath, AccessOption.PERSISTENT)) {
-      return null;
+      return new ZKHelixNonblockingLockInfo();
     }
-    ZKHelixNonblockingLockInfo<String> lockInfo = new ZKHelixNonblockingLockInfo<>();
     ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
-    lockInfo.setLockInfoFields(curLockInfo);
-    return lockInfo;
+    return new ZKHelixNonblockingLockInfo(curLockInfo);
   }
 
   @Override
-  /**
-   * Check if the user is current lock owner
-   * @return true if the user is the lock owner,
-   * false if the user is not the lock owner or the lock doesn't have a owner
-   */ public boolean isOwner() {
+  public boolean isOwner() {
     ZNRecord curLockInfo = _baseDataAccessor.get(_lockPath, null, AccessOption.PERSISTENT);
     if (curLockInfo == null) {
       return false;
     }
-    String ownerID = curLockInfo.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
-    if (ownerID == null) {
-      return false;
+    if (userIdMatches(curLockInfo)) {
+      return !hasTimedOut(curLockInfo);
+    }
+    return false;
+  }
+
+  /**
+   * Check if a lock has timed out
+   * @param record the current lock information in ZNRecord format
+   * @return return true if the lock has timed out, otherwise return false.
+   */
+  private boolean hasTimedOut(ZNRecord record) {
+    String timeoutStr = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.TIMEOUT.name());
+    return System.currentTimeMillis() >= Long.parseLong(timeoutStr);
+  }
+
+  /**
+   * Check if the current user Id matches with the owner Id in a lock info
+   * @param record the lock information in ZNRecord format
+   * @return return true if the two ids match, otherwise return false.
+   */
+  private boolean userIdMatches(ZNRecord record) {
+    String ownerId = record.getSimpleField(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name());
+    return ownerId.equals(_userId);
+  }
+
+  /**
+   * Class that specifies how a lock node should be updated with another lock node for a lock owner only
+   */
+  private class LockUpdater implements DataUpdater<ZNRecord> {
+    final ZNRecord _record;
+
+    /**
+     * Initialize a structure for lock owner to update a lock node value
+     * @param record the lock node value will be updated in ZNRecord format
+     */
+    public LockUpdater(ZNRecord record) {
+      _record = record;
+    }
+
+    @Override
+    public ZNRecord update(ZNRecord current) {
+      if (current != null && userIdMatches(current) && !hasTimedOut(current)) {
+        return _record;
+      }
+      throw new HelixException("User is not authorized to perform this operation.");
     }
-    return ownerID.equals(_userID);
   }
 }
diff --git a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
index fb86187..50daa03 100644
--- a/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
+++ b/helix-lock/src/main/java/org/apache/helix/lock/ZKHelixNonblockingLockInfo.java
@@ -25,45 +25,38 @@ import java.util.Map;
 import org.apache.helix.ZNRecord;
 
 
-public class ZKHelixNonblockingLockInfo<T extends String> implements LockInfo<T> {
+public class ZKHelixNonblockingLockInfo<K extends ZKHelixNonblockingLockInfo.InfoKey, V extends String> implements LockInfo<K, V> {
 
-  private Map<String, String> lockInfo;
+  public static final String DEFAULT_OWNER_TEXT = "";
+  public static final String DEFAULT_MESSAGE_TEXT = "";
+  public static final String DEFAULT_TIMEOUT_TEXT = String.valueOf(-1);
+  private Map<InfoKey, String> lockInfo;
 
-  enum InfoKey {
+  public enum InfoKey {
     OWNER, MESSAGE, TIMEOUT
   }
 
   public ZKHelixNonblockingLockInfo() {
     lockInfo = new HashMap<>();
+    lockInfo.put(InfoKey.OWNER, DEFAULT_OWNER_TEXT);
+    lockInfo.put(InfoKey.MESSAGE, DEFAULT_MESSAGE_TEXT);
+    lockInfo.put(InfoKey.TIMEOUT, DEFAULT_TIMEOUT_TEXT);
   }
 
-  @Override
-  /**
-   * Create a single filed of LockInfo, or update the value of the field if it already exists
-   * @param infoKey the key of the LockInfo field
-   * @param infoValue the value of the LockInfo field
-   */ public void setInfoValue(String infoKey, String infoValue) {
-    lockInfo.put(infoKey, infoValue);
+  public ZKHelixNonblockingLockInfo(ZNRecord znRecord) {
+    this();
+    lockInfo.put(InfoKey.OWNER, znRecord.getSimpleField(InfoKey.OWNER.name()));
+    lockInfo.put(InfoKey.MESSAGE, znRecord.getSimpleField(InfoKey.MESSAGE.name()));
+    lockInfo.put(InfoKey.TIMEOUT, znRecord.getSimpleField(InfoKey.TIMEOUT.name()));
   }
 
   @Override
-  /**
-   * Get the value of a field in LockInfo
-   * @param infoKey the key of the LockInfo field
-   * @return the value of the field or null if this key does not exist
-   */ public T getInfoValue(String infoKey) {
-    return (T) lockInfo.get(infoKey);
+  public void setInfoValue(InfoKey infoKey, String infoValue) {
+    lockInfo.put(infoKey, infoValue);
   }
 
-  /**
-   * Update the lock info with information in a ZNRecord
-   * @param record Information about the lock that stored as ZNRecord format
-   */
-  public void setLockInfoFields(ZNRecord record) {
-    if (record == null) {
-      return;
-    }
-    Map<String, String> recordSimpleFields = record.getSimpleFields();
-    lockInfo.putAll(recordSimpleFields);
+  @Override
+  public String getInfoValue(InfoKey infoKey) {
+    return lockInfo.get(infoKey);
   }
 }
diff --git a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
index 95c41ac..37daf7b 100644
--- a/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
+++ b/helix-lock/src/test/java/org/apache/helix/lock/TestZKHelixNonblockingLock.java
@@ -31,16 +31,19 @@ import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.zookeeper.CreateMode;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 
 public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
 
-  private final String _lockPath = "/testLockPath";
   private final String _className = TestHelper.getTestClassName();
   private final String _methodName = TestHelper.getTestMethodName();
   private final String _clusterName = _className + "_" + _methodName;
   private final String _lockMessage = "Test";
+  private String _lockPath;
+  private ZKHelixNonblockingLock _lock;
+  private String _userId;
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -49,79 +52,47 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
 
     TestHelper.setupCluster(_clusterName, ZK_ADDR, 12918, "localhost", "TestDB", 1, 10, 5, 3,
         "MasterSlave", true);
-  }
-
-  @Test
-  public void testAcquireLockWithParticipantScope() {
-
-    // Initialize lock with participant config scope
+    _userId = UUID.randomUUID().toString();
     HelixConfigScope participantScope =
         new HelixConfigScopeBuilder(ConfigScopeProperty.PARTICIPANT).forCluster(_clusterName)
             .forParticipant("localhost_12918").build();
 
-    String userID = UUID.randomUUID().toString();
-
-    ZKHelixNonblockingLock lock =
-        new ZKHelixNonblockingLock(_clusterName, participantScope, ZK_ADDR, Long.MAX_VALUE,
-            _lockMessage, userID);
+    _lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + participantScope;
+    _lock = new ZKHelixNonblockingLock(_clusterName, participantScope, ZK_ADDR, Long.MAX_VALUE,
+        _lockMessage, _userId);
 
-    // Acquire lock
-    lock.acquireLock();
-    String lockPath = "/" + _clusterName + '/' + "LOCKS" + '/' + participantScope;
-    Assert.assertTrue(_gZkClient.exists(lockPath));
-
-    // Get lock information
-    LockInfo<String> record = lock.getLockInfo();
-    Assert
-        .assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name()), userID);
-    Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name()),
-        _lockMessage);
-
-    // Check if the user is lock owner
-    Assert.assertTrue(lock.isOwner());
+  }
 
-    // Release lock
-    lock.releaseLock();
-    Assert.assertFalse(_gZkClient.exists(lockPath));
+  @BeforeMethod
+  public void beforeMethod() {
+    _gZkClient.delete(_lockPath);
+    Assert.assertFalse(_gZkClient.exists(_lockPath));
   }
 
   @Test
-  public void testAcquireLockWithUserProvidedPath() {
-
-    // Initialize lock with user provided path
-    String userID = UUID.randomUUID().toString();
-
-    ZKHelixNonblockingLock lock =
-        new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, Long.MAX_VALUE, _lockMessage, userID);
+  public void testAcquireLock() {
 
-    //Acquire lock
-    lock.acquireLock();
+    // Acquire lock
+    _lock.acquireLock();
     Assert.assertTrue(_gZkClient.exists(_lockPath));
 
     // Get lock information
-    LockInfo<String> record = lock.getLockInfo();
-    Assert
-        .assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER.name()), userID);
-    Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE.name()),
+    LockInfo<ZKHelixNonblockingLockInfo.InfoKey, String> record = _lock.getLockInfo();
+    Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.OWNER), _userId);
+    Assert.assertEquals(record.getInfoValue(ZKHelixNonblockingLockInfo.InfoKey.MESSAGE),
         _lockMessage);
 
     // Check if the user is lock owner
-    Assert.assertTrue(lock.isOwner());
+    Assert.assertTrue(_lock.isOwner());
 
     // Release lock
-    lock.releaseLock();
-    Assert.assertFalse(_gZkClient.exists(_lockPath));
+    _lock.releaseLock();
+    Assert.assertFalse(_lock.isOwner());
   }
 
   @Test
   public void testAcquireLockWhenExistingLockNotExpired() {
 
-    // Initialize lock with user provided path
-    String ownerID = UUID.randomUUID().toString();
-
-    ZKHelixNonblockingLock lock =
-        new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, 0L, _lockMessage, ownerID);
-
     // Fake condition when the lock owner is not current user
     String fakeUserID = UUID.randomUUID().toString();
     ZNRecord fakeRecord = new ZNRecord(fakeUserID);
@@ -131,28 +102,19 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
     _gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
 
     // Check if the user is lock owner
-    Assert.assertFalse(lock.isOwner());
+    Assert.assertFalse(_lock.isOwner());
 
     // Acquire lock
-    Assert.assertFalse(lock.acquireLock());
-    Assert.assertFalse(lock.isOwner());
+    Assert.assertFalse(_lock.acquireLock());
+    Assert.assertFalse(_lock.isOwner());
 
     // Release lock
-    Assert.assertFalse(lock.releaseLock());
-    Assert.assertTrue(_gZkClient.exists(_lockPath));
-
-    _gZkClient.delete(_lockPath);
+    Assert.assertFalse(_lock.releaseLock());
   }
 
   @Test
   public void testAcquireLockWhenExistingLockExpired() {
 
-    // Initialize lock with user provided path
-    String ownerID = UUID.randomUUID().toString();
-
-    ZKHelixNonblockingLock lock =
-        new ZKHelixNonblockingLock(_lockPath, ZK_ADDR, Long.MAX_VALUE, _lockMessage, ownerID);
-
     // Fake condition when the current lock already expired
     String fakeUserID = UUID.randomUUID().toString();
     ZNRecord fakeRecord = new ZNRecord(fakeUserID);
@@ -162,15 +124,17 @@ public class TestZKHelixNonblockingLock extends ZkUnitTestBase {
     _gZkClient.create(_lockPath, fakeRecord, CreateMode.PERSISTENT);
 
     // Acquire lock
-    Assert.assertTrue(lock.acquireLock());
-    Assert.assertTrue(_gZkClient.exists(_lockPath));
-
-    // Check if the current user is the lock owner
-    Assert.assertTrue(lock.isOwner());
+    Assert.assertTrue(_lock.acquireLock());
+    Assert.assertTrue(_lock.isOwner());
 
     // Release lock
-    Assert.assertTrue(lock.releaseLock());
-    Assert.assertFalse(_gZkClient.exists(_lockPath));
+    Assert.assertTrue(_lock.releaseLock());
+    Assert.assertFalse(_lock.isOwner());
+  }
+
+  @Test
+  public void testSimultaneousAcquire() {
+
   }
 }