You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/12/01 19:38:30 UTC

[GitHub] [helix] zhangmeng916 commented on a change in pull request #1564: Implement Helix lock priority and notification

zhangmeng916 commented on a change in pull request #1564:
URL: https://github.com/apache/helix/pull/1564#discussion_r533672387



##########
File path: helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
##########
@@ -31,69 +31,162 @@
 import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.ZNRecordUpdater;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.lock.LockInfo.DEFAULT_PRIORITY_INT;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_ID;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_PRIORITY_INT;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_REQUESTING_TIMESTAMP_LONG;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_WAITING_TIMEOUT_LONG;
+import static org.apache.helix.lock.LockInfo.DEFAULT_WAITING_TIMEOUT_LONG;
+import static org.apache.helix.lock.LockInfo.ZNODE_ID;
 
 
 /**
  * Helix nonblocking lock implementation based on Zookeeper.
  * NOTE: do NOT use ephemeral nodes in this implementation because ephemeral mode is not supported
  * in ZooScalability mode.
  */
-public class ZKDistributedNonblockingLock implements DistributedLock {
-  private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class);
+public class ZKDistributedNonblockingLock implements DistributedLock, IZkDataListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDistributedNonblockingLock.class);
 
   private final String _lockPath;
   private final String _userId;
-  private final long _timeout;
   private final String _lockMsg;
+  private final long _leaseTimeout;
+  private final long _waitingTimeout;
+  private final long _cleanupTimeout;
+  private final int _priority;
+  private final boolean _isForceful;
+  private final LockListener _lockListener;
   private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
+  private boolean _isLocked;
+  private boolean _isPending;
+  private boolean _isPreempted;
+  private long _pendingTimeout;
 
   /**
    * Initialize the lock with user provided information, e.g.,cluster, scope, etc.
    * @param scope the scope to lock
    * @param zkAddress the zk address the cluster connects to
-   * @param timeout the timeout period of the lock
+   * @param leaseTimeout the leasing timeout period of the lock
    * @param lockMsg the reason for having this lock
    * @param userId a universal unique userId for lock owner identity
    */
-  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout,
+  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long leaseTimeout,
       String lockMsg, String userId) {
-    this(scope.getPath(), timeout, lockMsg, userId, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
+    this(scope.getPath(), leaseTimeout, lockMsg, userId, 0, Integer.MAX_VALUE, 0, false,
+        new LockListener() {
+          @Override
+          public void onCleanupNotification() {
+          }
+        }, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
+  }
+
+  /**
+   * Initialize the lock with user provided information, e.g.,cluster, scope, etc.
+   * @param scope the scope to lock
+   * @param zkAddress the zk address the cluster connects to
+   * @param leaseTimeout the leasing timeout period of the lock
+   * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
+   * @param priority the priority of the lock
+   * @param waitingTimeout the waiting timeout period of the lock when the tryLock request is issued
+   * @param cleanupTimeout the time period needed to finish the cleanup work by the lock when it
+   *                      is preempted
+   * @param isForceful whether the lock is a forceful one. This determines the behavior when the
+   *                   lock encountered an exception during preempting lower priority lock
+   * @param lockListener the listener associated to the lock
+   */
+  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long leaseTimeout,
+      String lockMsg, String userId, int priority, long waitingTimeout, long cleanupTimeout,
+      boolean isForceful, LockListener lockListener) {
+    this(scope.getPath(), leaseTimeout, lockMsg, userId, priority, waitingTimeout, cleanupTimeout,
+        isForceful, lockListener, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
   }
 
   /**
    * Initialize the lock with user provided information, e.g., lock path under zookeeper, etc.
    * @param lockPath the path of the lock under Zookeeper
-   * @param timeout the timeout period of the lock
+   * @param leaseTimeout the leasing timeout period of the lock
    * @param lockMsg the reason for having this lock
    * @param userId a universal unique userId for lock owner identity
+   * @param priority the priority of the lock
+   * @param waitingTimeout the waiting timeout period of the lock when the tryLock request is issued
+   * @param cleanupTimeout the time period needed to finish the cleanup work by the lock when it
+   *                      is preempted
+   * @param isForceful whether the lock is a forceful one. This determines the behavior when the
+   *                   lock encountered an exception during preempting lower priority lock
+   * @param lockListener the listener associated to the lock
    * @param baseDataAccessor baseDataAccessor instance to do I/O against ZK with
    */
-  private ZKDistributedNonblockingLock(String lockPath, Long timeout, String lockMsg, String userId,
-      BaseDataAccessor<ZNRecord> baseDataAccessor) {
+  private ZKDistributedNonblockingLock(String lockPath, Long leaseTimeout, String lockMsg,
+      String userId, int priority, long waitingTimeout, long cleanupTimeout, boolean isForceful,
+      LockListener lockListener, BaseDataAccessor<ZNRecord> baseDataAccessor) {
     _lockPath = lockPath;
-    if (timeout < 0) {
-      throw new IllegalArgumentException("The expiration time cannot be negative.");
+    if (leaseTimeout < 0 || waitingTimeout < 0 || cleanupTimeout < 0) {
+      throw new IllegalArgumentException("Timeout cannot be negative.");
+    }
+    if (priority < 0) {
+      throw new IllegalArgumentException("Priority cannot be negative.");
     }
-    _timeout = timeout;
+    _leaseTimeout = leaseTimeout;
     _lockMsg = lockMsg;
     _userId = userId;
     _baseDataAccessor = baseDataAccessor;
+    _priority = priority;
+    _waitingTimeout = waitingTimeout;
+    _cleanupTimeout = cleanupTimeout;
+    _lockListener = lockListener;
+    _isForceful = isForceful;
   }
 
   @Override
-  public boolean tryLock() {
+  public synchronized boolean tryLock() {
     // Set lock information fields
-    long deadline;
-    // Prevent value overflow
-    if (_timeout > Long.MAX_VALUE - System.currentTimeMillis()) {
-      deadline = Long.MAX_VALUE;
-    } else {
-      deadline = System.currentTimeMillis() + _timeout;
+    _baseDataAccessor.subscribeDataChanges(_lockPath, this);
+    LockUpdater updater = new LockUpdater(
+        new LockInfo(_userId, _lockMsg, getNonOverflowTimestamp(_leaseTimeout), _priority,
+            _waitingTimeout, _cleanupTimeout, null, 0, 0, 0));
+    boolean updateResult = _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
+
+    // Check whether the lock request is still pending. If yes, we will wait for the period
+    // recorded in _pendingTimeout.
+    if (_isPending) {
+      try {
+        wait(_pendingTimeout);

Review comment:
       This wait() will be signaled by the following notify(), or if it never receives the notify, it'll wait until timeout.
   
     public void onAcquiredLockNotification() {
       synchronized (ZKDistributedNonblockingLock.this) {
         _isLocked = true;
         ZKDistributedNonblockingLock.this.notify();
       }
     }

##########
File path: helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
##########
@@ -31,69 +31,162 @@
 import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.ZNRecordUpdater;
 import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.log4j.Logger;
+import org.apache.helix.zookeeper.zkclient.IZkDataListener;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.lock.LockInfo.DEFAULT_PRIORITY_INT;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_ID;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_PRIORITY_INT;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_REQUESTING_TIMESTAMP_LONG;
+import static org.apache.helix.lock.LockInfo.DEFAULT_REQUESTOR_WAITING_TIMEOUT_LONG;
+import static org.apache.helix.lock.LockInfo.DEFAULT_WAITING_TIMEOUT_LONG;
+import static org.apache.helix.lock.LockInfo.ZNODE_ID;
 
 
 /**
  * Helix nonblocking lock implementation based on Zookeeper.
  * NOTE: do NOT use ephemeral nodes in this implementation because ephemeral mode is not supported
  * in ZooScalability mode.
  */
-public class ZKDistributedNonblockingLock implements DistributedLock {
-  private static final Logger LOG = Logger.getLogger(ZKDistributedNonblockingLock.class);
+public class ZKDistributedNonblockingLock implements DistributedLock, IZkDataListener {
+  private static final Logger LOG = LoggerFactory.getLogger(ZKDistributedNonblockingLock.class);
 
   private final String _lockPath;
   private final String _userId;
-  private final long _timeout;
   private final String _lockMsg;
+  private final long _leaseTimeout;
+  private final long _waitingTimeout;
+  private final long _cleanupTimeout;
+  private final int _priority;
+  private final boolean _isForceful;
+  private final LockListener _lockListener;
   private final BaseDataAccessor<ZNRecord> _baseDataAccessor;
+  private boolean _isLocked;
+  private boolean _isPending;
+  private boolean _isPreempted;
+  private long _pendingTimeout;
 
   /**
    * Initialize the lock with user provided information, e.g.,cluster, scope, etc.
    * @param scope the scope to lock
    * @param zkAddress the zk address the cluster connects to
-   * @param timeout the timeout period of the lock
+   * @param leaseTimeout the leasing timeout period of the lock
    * @param lockMsg the reason for having this lock
    * @param userId a universal unique userId for lock owner identity
    */
-  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long timeout,
+  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long leaseTimeout,
       String lockMsg, String userId) {
-    this(scope.getPath(), timeout, lockMsg, userId, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
+    this(scope.getPath(), leaseTimeout, lockMsg, userId, 0, Integer.MAX_VALUE, 0, false,
+        new LockListener() {
+          @Override
+          public void onCleanupNotification() {
+          }
+        }, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
+  }
+
+  /**
+   * Initialize the lock with user provided information, e.g.,cluster, scope, etc.
+   * @param scope the scope to lock
+   * @param zkAddress the zk address the cluster connects to
+   * @param leaseTimeout the leasing timeout period of the lock
+   * @param lockMsg the reason for having this lock
+   * @param userId a universal unique userId for lock owner identity
+   * @param priority the priority of the lock
+   * @param waitingTimeout the waiting timeout period of the lock when the tryLock request is issued
+   * @param cleanupTimeout the time period needed to finish the cleanup work by the lock when it
+   *                      is preempted
+   * @param isForceful whether the lock is a forceful one. This determines the behavior when the
+   *                   lock encountered an exception during preempting lower priority lock
+   * @param lockListener the listener associated to the lock
+   */
+  public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long leaseTimeout,
+      String lockMsg, String userId, int priority, long waitingTimeout, long cleanupTimeout,
+      boolean isForceful, LockListener lockListener) {
+    this(scope.getPath(), leaseTimeout, lockMsg, userId, priority, waitingTimeout, cleanupTimeout,
+        isForceful, lockListener, new ZkBaseDataAccessor<ZNRecord>(zkAddress));
   }
 
   /**
    * Initialize the lock with user provided information, e.g., lock path under zookeeper, etc.
    * @param lockPath the path of the lock under Zookeeper
-   * @param timeout the timeout period of the lock
+   * @param leaseTimeout the leasing timeout period of the lock
    * @param lockMsg the reason for having this lock
    * @param userId a universal unique userId for lock owner identity
+   * @param priority the priority of the lock
+   * @param waitingTimeout the waiting timeout period of the lock when the tryLock request is issued
+   * @param cleanupTimeout the time period needed to finish the cleanup work by the lock when it
+   *                      is preempted
+   * @param isForceful whether the lock is a forceful one. This determines the behavior when the
+   *                   lock encountered an exception during preempting lower priority lock
+   * @param lockListener the listener associated to the lock
    * @param baseDataAccessor baseDataAccessor instance to do I/O against ZK with
    */
-  private ZKDistributedNonblockingLock(String lockPath, Long timeout, String lockMsg, String userId,
-      BaseDataAccessor<ZNRecord> baseDataAccessor) {
+  private ZKDistributedNonblockingLock(String lockPath, Long leaseTimeout, String lockMsg,
+      String userId, int priority, long waitingTimeout, long cleanupTimeout, boolean isForceful,
+      LockListener lockListener, BaseDataAccessor<ZNRecord> baseDataAccessor) {
     _lockPath = lockPath;
-    if (timeout < 0) {
-      throw new IllegalArgumentException("The expiration time cannot be negative.");
+    if (leaseTimeout < 0 || waitingTimeout < 0 || cleanupTimeout < 0) {
+      throw new IllegalArgumentException("Timeout cannot be negative.");
+    }
+    if (priority < 0) {
+      throw new IllegalArgumentException("Priority cannot be negative.");
     }
-    _timeout = timeout;
+    _leaseTimeout = leaseTimeout;
     _lockMsg = lockMsg;
     _userId = userId;
     _baseDataAccessor = baseDataAccessor;
+    _priority = priority;
+    _waitingTimeout = waitingTimeout;
+    _cleanupTimeout = cleanupTimeout;
+    _lockListener = lockListener;
+    _isForceful = isForceful;
   }
 
   @Override
-  public boolean tryLock() {
+  public synchronized boolean tryLock() {
     // Set lock information fields
-    long deadline;
-    // Prevent value overflow
-    if (_timeout > Long.MAX_VALUE - System.currentTimeMillis()) {
-      deadline = Long.MAX_VALUE;
-    } else {
-      deadline = System.currentTimeMillis() + _timeout;
+    _baseDataAccessor.subscribeDataChanges(_lockPath, this);
+    LockUpdater updater = new LockUpdater(
+        new LockInfo(_userId, _lockMsg, getNonOverflowTimestamp(_leaseTimeout), _priority,
+            _waitingTimeout, _cleanupTimeout, null, 0, 0, 0));
+    boolean updateResult = _baseDataAccessor.update(_lockPath, updater, AccessOption.PERSISTENT);
+
+    // Check whether the lock request is still pending. If yes, we will wait for the period
+    // recorded in _pendingTimeout.
+    if (_isPending) {

Review comment:
       This boolean variable is used as a flag to distinguish between different scenarios so that we can act differently. The actual functioning part is the following wait(). 

##########
File path: helix-lock/src/main/java/org/apache/helix/lock/LockInfo.java
##########
@@ -30,13 +30,23 @@
   // Default values for each attribute if there are no current values set by user
   public static final String DEFAULT_OWNER_TEXT = "";
   public static final String DEFAULT_MESSAGE_TEXT = "";
-  public static final long DEFAULT_TIMEOUT_LONG = -1L;
+  public static final long DEFAULT_TIMEOUT_LONG = -1;
+  public static final int DEFAULT_PRIORITY_INT = -1;
+  public static final long DEFAULT_WAITING_TIMEOUT_LONG = -1;
+  public static final long DEFAULT_CLEANUP_TIMEOUT_LONG = -1;
+  public static final String DEFAULT_REQUESTOR_ID = "";
+  public static final int DEFAULT_REQUESTOR_PRIORITY_INT = -1;
+  public static final long DEFAULT_REQUESTOR_WAITING_TIMEOUT_LONG = -1;
+  public static final long DEFAULT_REQUESTOR_REQUESTING_TIMESTAMP_LONG = -1;

Review comment:
       Good suggestion! Will do.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org