You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@accumulo.apache.org by GitBox <gi...@apache.org> on 2021/02/01 18:29:24 UTC

[GitHub] [accumulo] dlmarion commented on a change in pull request #1866: Add a UUID to the server's ZooLock

dlmarion commented on a change in pull request #1866:
URL: https://github.com/apache/accumulo/pull/1866#discussion_r568046371



##########
File path: core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
##########
@@ -123,189 +132,383 @@ public void unableToMonitorLockNode(Exception e) {
   public synchronized boolean tryLock(LockWatcher lw, byte[] data)
       throws KeeperException, InterruptedException {
 
-    TryLockAsyncLockWatcher tlalw = new TryLockAsyncLockWatcher(lw);
+    LockWatcherWrapper lww = new LockWatcherWrapper(lw);
 
-    lockAsync(tlalw, data);
+    lock(lww, data);
 
-    if (tlalw.acquiredLock) {
+    if (lww.acquiredLock) {
       return true;
     }
 
-    if (asyncLock != null) {
-      zooKeeper.recursiveDelete(path + "/" + asyncLock, NodeMissingPolicy.SKIP);
-      asyncLock = null;
+    // If we didn't acquire the lock, then delete the path we just created
+    if (createdNodeName != null) {
+      String pathToDelete = path + "/" + createdNodeName;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[{}] Failed to acquire lock in tryLock(), deleting all at path: {}",
+            this.getZLockPrefix(), pathToDelete);
+      }
+      zooKeeper.recursiveDelete(pathToDelete, NodeMissingPolicy.SKIP);
+      createdNodeName = null;
     }
 
     return false;
   }
 
-  private synchronized void lockAsync(final String myLock, final AsyncLockWatcher lw)
-      throws KeeperException, InterruptedException {
+  /**
+   * Sort list of ephemeral nodes by their sequence number. Any ephemeral nodes that are not of the
+   * correct form will sort last.
+   *
+   * @param children
+   *          list of ephemeral nodes
+   */
+  public static void sortChildrenByLockPrefix(List<String> children) {
+    Collections.sort(children, new Comparator<String>() {
+      @Override
+      public int compare(String o1, String o2) {
+
+        // Lock should be of the form:
+        // zlock#UUID#sequenceNumber
+        // Example:
+        // zlock#44755fbe-1c9e-40b3-8458-03abaf950d7e#0000000000
+
+        boolean lValid = true;
+        if (o1.length() != 53 && o1.lastIndexOf('#') != 42) {
+          lValid = false;
+        }
+        boolean rValid = true;
+        if (o2.length() != 53 && o2.lastIndexOf('#') != 42) {
+          rValid = false;
+        }
+        // Make any invalid's sort last (higher)
+        if (lValid && rValid) {
+          int leftIdx = 43, rightIdx = 43;
+          return Integer.valueOf(o1.substring(leftIdx))
+              .compareTo(Integer.valueOf(o2.substring(rightIdx)));
+        } else if (!lValid && rValid) {
+          return 1;
+        } else if (lValid && !rValid) {
+          return -1;
+        } else {
+          return 0;
+        }
+      }
+    });
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Children nodes: {}", children.size());
+      for (String child : children) {
+        LOG.debug("- {}", child);
+      }
+    }
+  }
 
-    if (asyncLock == null) {
-      throw new IllegalStateException("Called lockAsync() when asyncLock == null");
+  /**
+   * Given a pre-sorted set of children ephemeral nodes where the node name is of the form
+   * "zlock#UUID#sequenceNumber", find the ephemeral node that sorts before the ephemeralNode
+   * parameter with the lowest sequence number
+   *
+   * @param children
+   *          list of sequential ephemera nodes, already sorted
+   * @param ephemeralNode
+   *          starting node for the search
+   * @return next lowest prefix with the lowest sequence number
+   */
+  public static String findLowestPrevPrefix(final List<String> children,
+      final String ephemeralNode) {
+    int idx = children.indexOf(ephemeralNode);
+    // Get the prefix from the prior ephemeral node
+    String prev = children.get(idx - 1);
+    int prefixIdx = prev.lastIndexOf('#');
+    String prevPrefix = prev.substring(0, prefixIdx);
+
+    // Find the lowest sequential ephemeral node with prevPrefix
+    int i = 2;
+    String lowestPrevNode = prev;
+    while ((idx - i) >= 0) {
+      prev = children.get(idx - i);
+      i++;
+      if (prev.startsWith(prevPrefix)) {
+        lowestPrevNode = prev;
+      } else {
+        break;
+      }
     }
+    return lowestPrevNode;
+  }
 
-    List<String> children = zooKeeper.getChildren(path);
+  private synchronized void determineLockOwnership(final String createdEphemeralNode,
+      final AccumuloLockWatcher lw) throws KeeperException, InterruptedException {
 
-    if (!children.contains(myLock)) {
-      throw new RuntimeException("Lock attempt ephemeral node no longer exist " + myLock);
+    if (createdNodeName == null) {
+      throw new IllegalStateException(
+          "Called determineLockOwnership() when ephemeralNodeName == null");
     }
 
-    Collections.sort(children);
-    if (log.isTraceEnabled()) {
-      log.trace("Candidate lock nodes");
-      for (String child : children) {
-        log.trace("- {}", child);
+    List<String> children = new ArrayList<>();
+    zooKeeper.getChildren(path).forEach(s -> {
+      if (s.startsWith(ZLOCK_PREFIX)) {
+        children.add(s);
       }
+    });
+
+    if (!children.contains(createdEphemeralNode)) {
+      throw new RuntimeException(
+          "Lock attempt ephemeral node no longer exist " + createdEphemeralNode);
     }
+    sortChildrenByLockPrefix(children);
 
-    if (children.get(0).equals(myLock)) {
-      log.trace("First candidate is my lock, acquiring");
+    if (children.get(0).equals(createdEphemeralNode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[{}] First candidate is my lock, acquiring", this.getZLockPrefix());
+      }
       if (!watchingParent) {
         throw new IllegalStateException(
             "Can not acquire lock, no longer watching parent : " + path);
       }
       this.lockWatcher = lw;
-      this.lock = myLock;
-      asyncLock = null;
+      this.lockNodeName = createdEphemeralNode;
+      createdNodeName = null;
       lockWasAcquired = true;
       lw.acquiredLock();
-      return;
-    }
-    String prev = null;
-    for (String child : children) {
-      if (child.equals(myLock)) {
-        break;
+    } else {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[{}] Lock held by another process with ephemeral node: {}",
+            this.getZLockPrefix(), children.get(0));
       }
 
-      prev = child;
-    }
-
-    final String lockToWatch = path + "/" + prev;
-    log.trace("Establishing watch on {}", lockToWatch);
-    Stat stat = zooKeeper.getStatus(lockToWatch, new Watcher() {
+      String lowestPrevNode = findLowestPrevPrefix(children, createdEphemeralNode);
 
-      @Override
-      public void process(WatchedEvent event) {
-        if (log.isTraceEnabled()) {
-          log.trace("Processing event:");
-          log.trace("- type  {}", event.getType());
-          log.trace("- path  {}", event.getPath());
-          log.trace("- state {}", event.getState());
-        }
-        boolean renew = true;
-        if (event.getType() == EventType.NodeDeleted && event.getPath().equals(lockToWatch)) {
-          log.trace("Detected deletion of {}, attempting to acquire lock", lockToWatch);
-          synchronized (ZooLock.this) {
-            try {
-              if (asyncLock != null) {
-                lockAsync(myLock, lw);
-              } else if (log.isTraceEnabled()) {
-                log.trace("While waiting for another lock {} {} was deleted", lockToWatch, myLock);
-              }
-            } catch (Exception e) {
-              if (lock == null) {
-                // have not acquired lock yet
-                lw.failedToAcquireLock(e);
+      watchingNodeName = path + "/" + lowestPrevNode;
+      final String nodeToWatch = watchingNodeName;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("[{}] Establishing watch on prior node {}", this.getZLockPrefix(), nodeToWatch);
+      }
+      Watcher priorNodeWatcher = new Watcher() {
+        @Override
+        public void process(WatchedEvent event) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("[{}] Processing event:", getZLockPrefix());
+            LOG.trace("- type  {}", event.getType());
+            LOG.trace("- path  {}", event.getPath());
+            LOG.trace("- state {}", event.getState());
+          }
+          boolean renew = true;
+          if (event.getType() == EventType.NodeDeleted && event.getPath().equals(nodeToWatch)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("[{}] Detected deletion of prior node {}, attempting to acquire lock",
+                  getZLockPrefix(), nodeToWatch);
+            }
+            synchronized (ZooLock.this) {
+              try {
+                if (createdNodeName != null) {
+                  determineLockOwnership(createdEphemeralNode, lw);
+                } else if (LOG.isDebugEnabled()) {
+                  LOG.debug("[{}] While waiting for another lock {}, {} was deleted",
+                      getZLockPrefix(), nodeToWatch, createdEphemeralNode);
+                }
+              } catch (Exception e) {
+                if (lockNodeName == null) {
+                  // have not acquired lock yet
+                  lw.failedToAcquireLock(e);
+                }
               }
             }
+            renew = false;
           }
-          renew = false;
-        }
 
-        if (event.getState() == KeeperState.Expired
-            || event.getState() == KeeperState.Disconnected) {
-          synchronized (ZooLock.this) {
-            if (lock == null) {
-              lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
+          if (event.getState() == KeeperState.Expired
+              || event.getState() == KeeperState.Disconnected) {
+            synchronized (ZooLock.this) {
+              if (lockNodeName == null) {
+                LOG.info("Zookeeper Session expired / disconnected");
+                lw.failedToAcquireLock(new Exception("Zookeeper Session expired / disconnected"));
+              }
             }
+            renew = false;
           }
-          renew = false;
-        }
-        if (renew) {
-          log.trace("Renewing watch on {}", lockToWatch);
-          try {
-            Stat restat = zooKeeper.getStatus(lockToWatch, this);
-            if (restat == null) {
-              lockAsync(myLock, lw);
+          if (renew) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("[{}] Renewing watch on prior node  {}", getZLockPrefix(), nodeToWatch);
+            }
+            try {
+              Stat restat = zooKeeper.getStatus(nodeToWatch, this);
+              if (restat == null) {
+                // if stat is null from the zookeeper.exists(path, Watcher) call, then we just
+                // created a Watcher on a node that does not exist. Delete the watcher we just
+                // created.
+                zooKeeper.getZooKeeper().removeWatches(nodeToWatch, this, WatcherType.Any, true);
+                determineLockOwnership(createdEphemeralNode, lw);
+              }
+            } catch (KeeperException | InterruptedException e) {
+              lw.failedToAcquireLock(new Exception("Failed to renew watch on other master node"));

Review comment:
       @EdColeman  opened https://github.com/apache/accumulo/issues/1893 for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org