You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/05 00:13:10 UTC

svn commit: r1442429 - in /accumulo/trunk: fate/src/main/java/org/apache/accumulo/fate/zookeeper/ test/src/test/java/org/apache/accumulo/fate/ test/src/test/java/org/apache/accumulo/fate/zookeeper/

Author: kturner
Date: Mon Feb  4 23:13:10 2013
New Revision: 1442429

URL: http://svn.apache.org/viewvc?rev=1442429&view=rev
Log:
ACCUMULO-954 Made zoolock rewatch its parent node and added some unit test for zoolock

Added:
    accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/
    accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/
    accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
Modified:
    accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java

Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java?rev=1442429&r1=1442428&r2=1442429&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java Mon Feb  4 23:13:10 2013
@@ -38,7 +38,7 @@ public class ZooLock implements Watcher 
   public static final String LOCK_PREFIX = "zlock-";
   
   public enum LockLossReason {
-    LOCK_DELETED, SESSION_EXPIRED
+    LOCK_DELETED, SESSION_EXPIRED, UNKNOWN
   }
   
   public interface LockWatcher {
@@ -56,7 +56,7 @@ public class ZooLock implements Watcher 
   protected final IZooReaderWriter zooKeeper;
   private String lock;
   private LockWatcher lockWatcher;
-  
+  private boolean watchingParent = false;
   private String asyncLock;
   
   public ZooLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path) {
@@ -69,8 +69,10 @@ public class ZooLock implements Watcher 
     zooKeeper = zrw;
     try {
       zooKeeper.getStatus(path, this);
+      watchingParent = true;
     } catch (Exception ex) {
       log.warn("Error getting setting initial watch on ZooLock", ex);
+      throw new RuntimeException(ex);
     }
   }
   
@@ -131,6 +133,8 @@ public class ZooLock implements Watcher 
     Collections.sort(children);
     
     if (children.get(0).equals(myLock)) {
+      if (!watchingParent)
+        throw new RuntimeException("Can not acquire lock, no longer watching parent");
       this.lockWatcher = lw;
       this.lock = myLock;
       asyncLock = null;
@@ -170,7 +174,7 @@ public class ZooLock implements Watcher 
             }
           }
         }
-        
+
         if (event.getState() == KeeperState.Expired) {
           synchronized (ZooLock.this) {
             if (lock == null) {
@@ -186,6 +190,14 @@ public class ZooLock implements Watcher 
       lockAsync(myLock, lw);
   }
   
+  private void lostLock(LockLossReason reason) {
+    LockWatcher localLw = lockWatcher;
+    lock = null;
+    lockWatcher = null;
+    
+    localLw.lostLock(reason);
+  }
+
   public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
     
     if (lockWatcher != null || lock != null || asyncLock != null) {
@@ -195,22 +207,48 @@ public class ZooLock implements Watcher 
     lockWasAcquired = false;
     
     try {
-      String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+      final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
       
       Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+        
+        private void failedToAcquireLock(){
+          lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+          asyncLock = null;
+        }
+        
         public void process(WatchedEvent event) {
           synchronized (ZooLock.this) {
             if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
-              LockWatcher localLw = lockWatcher;
-              lock = null;
-              lockWatcher = null;
-              
-              localLw.lostLock(LockLossReason.LOCK_DELETED);
-              
+              lostLock(LockLossReason.LOCK_DELETED);
             } else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
-              lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
-              asyncLock = null;
+              failedToAcquireLock();
+            } else if(event.getState() != KeeperState.Expired) {
+              log.warn("Unexpected event wathcing lock node "+event+" "+asyncLockPath);
+              try {
+                Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+                if(stat2 == null){
+                  if(lock != null)
+                    lostLock(LockLossReason.LOCK_DELETED);
+                  else if(asyncLock != null)
+                    failedToAcquireLock();
+                }
+              } catch (Exception e) {
+                log.error("Failed to stat lock node " + asyncLockPath, e);
+
+                try {
+                  // not sure what happened... try to clean lock node up....
+                  zooKeeper.delete(asyncLockPath, -1);
+                } catch (Throwable e2) {
+                  log.debug("Failed to clean up lock node " + asyncLockPath, e2);
+                }
+                
+                if(lock != null)
+                  lostLock(LockLossReason.UNKNOWN);
+                else if(asyncLock != null)
+                  failedToAcquireLock();
+              }
             }
+           
           }
         }
       });
@@ -298,12 +336,27 @@ public class ZooLock implements Watcher 
   public synchronized void process(WatchedEvent event) {
     log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
     
+    watchingParent = false;
+
     if (event.getState() == KeeperState.Expired && lock != null) {
-      LockWatcher localLw = lockWatcher;
-      lock = null;
-      lockWatcher = null;
-      localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+      if (lock != null) {
+        lostLock(LockLossReason.SESSION_EXPIRED);
+      }
+    } else {
+      
+      try { // set the watch on the parent node again
+        zooKeeper.getStatus(path, this);
+        watchingParent = true;
+      } catch (Exception ex) {
+        log.warn("Error resetting watch on ZooLock", ex);
+        
+        if (lock != null) {
+          lostLock(LockLossReason.UNKNOWN);
+        }
+      }
+       
     }
+
   }
   
   public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {

Added: accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java?rev=1442429&view=auto
==============================================================================
--- accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java (added)
+++ accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java Mon Feb  4 23:13:10 2013
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooLock.AsyncLockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.test.MiniAccumuloCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * 
+ */
+public class ZooLockTest {
+
+  public static TemporaryFolder folder = new TemporaryFolder();
+  
+  private static MiniAccumuloCluster accumulo;
+  
+  static class TestALW implements AsyncLockWatcher {
+    
+    LockLossReason reason = null;
+    boolean locked = false;
+    Exception exception = null;
+    int changes = 0;
+    
+    @Override
+    public synchronized void lostLock(LockLossReason reason) {
+      this.reason = reason;
+      changes++;
+      this.notifyAll();
+    }
+    
+    @Override
+    public synchronized void acquiredLock() {
+      this.locked = true;
+      changes++;
+      this.notifyAll();
+    }
+    
+    @Override
+    public synchronized void failedToAcquireLock(Exception e) {
+      this.exception = e;
+      changes++;
+      this.notifyAll();
+    }
+    
+    public synchronized void waitForChanges(int numExpected) throws InterruptedException {
+      while (changes < numExpected) {
+        this.wait();
+      }
+    }
+  }
+
+  @BeforeClass
+  public static void setupMiniCluster() throws Exception {
+    
+    folder.create();
+    
+    Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
+    
+    accumulo = new MiniAccumuloCluster(folder.getRoot(), "superSecret");
+    
+    accumulo.start();
+    
+  }
+  
+  private static int pdCount = 0;
+
+  @Test(timeout = 10000)
+  public void testDeleteParent() throws Exception {
+    accumulo.getZookeepers();
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+    Assert.assertFalse(zl.isLocked());
+
+    ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+    
+    // intentionally created parent after lock
+    zk.mkdirs(parent);
+    
+    zk.delete(parent, -1);
+    
+    zk.mkdirs(parent);
+
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    
+    lw.waitForChanges(1);
+
+    Assert.assertTrue(lw.locked);
+    Assert.assertTrue(zl.isLocked());
+    Assert.assertNull(lw.exception);
+    Assert.assertNull(lw.reason);
+  }
+  
+  @Test(timeout = 10000)
+  public void testNoParent() throws Exception {
+    accumulo.getZookeepers();
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+    Assert.assertFalse(zl.isLocked());
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    
+    lw.waitForChanges(1);
+
+    Assert.assertFalse(lw.locked);
+    Assert.assertFalse(zl.isLocked());
+    Assert.assertNotNull(lw.exception);
+    Assert.assertNull(lw.reason);
+  }
+
+  @Test(timeout = 10000)
+  public void testDeleteLock() throws Exception {
+    accumulo.getZookeepers();
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    
+    ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+    zk.mkdirs(parent);
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+    Assert.assertFalse(zl.isLocked());
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    
+    lw.waitForChanges(1);
+
+    Assert.assertTrue(lw.locked);
+    Assert.assertTrue(zl.isLocked());
+    Assert.assertNull(lw.exception);
+    Assert.assertNull(lw.reason);
+    
+    zk.delete(zl.getLockPath(), -1);
+    
+    lw.waitForChanges(2);
+
+    Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+    Assert.assertNull(lw.exception);
+
+  }
+  
+  @Test(timeout = 10000)
+  public void testDeleteWaiting() throws Exception {
+    accumulo.getZookeepers();
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    
+    ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+    zk.mkdirs(parent);
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+    Assert.assertFalse(zl.isLocked());
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    
+    lw.waitForChanges(1);
+
+    Assert.assertTrue(lw.locked);
+    Assert.assertTrue(zl.isLocked());
+    Assert.assertNull(lw.exception);
+    Assert.assertNull(lw.reason);
+    
+    
+    ZooLock zl2 = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+    
+    TestALW lw2 = new TestALW();
+    
+    zl2.lockAsync(lw2, "test2".getBytes());
+    
+    Assert.assertFalse(lw2.locked);
+    Assert.assertFalse(zl2.isLocked());
+    
+    ZooLock zl3 = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+    
+    TestALW lw3 = new TestALW();
+    
+    zl3.lockAsync(lw3, "test3".getBytes());
+
+    List<String> children = zk.getChildren(parent);
+    Collections.sort(children);
+    
+    zk.delete(parent + "/" + children.get(1), -1);
+    
+    lw2.waitForChanges(1);
+    
+    Assert.assertFalse(lw2.locked);
+    Assert.assertNotNull(lw2.exception);
+    Assert.assertNull(lw2.reason);
+    
+    zk.delete(parent + "/" + children.get(0), -1);
+    
+    lw.waitForChanges(2);
+    
+    Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+    Assert.assertNull(lw.exception);
+    
+    lw3.waitForChanges(1);
+    
+    Assert.assertTrue(lw3.locked);
+    Assert.assertTrue(zl3.isLocked());
+    Assert.assertNull(lw3.exception);
+    Assert.assertNull(lw3.reason);
+
+  }
+  
+  @Test(timeout = 10000)
+  public void testUnexpectedEvent() throws Exception {
+    accumulo.getZookeepers();
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    
+    ZooKeeper zk = new ZooKeeper(accumulo.getZookeepers(), 30000, null);
+    zk.addAuthInfo("digest", "secret".getBytes());
+    
+    zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+    
+    Assert.assertFalse(zl.isLocked());
+    
+    // would not expect data to be set on this node, but it should not cause problems.....
+    zk.setData(parent, "foo".getBytes(), -1);
+    
+    TestALW lw = new TestALW();
+    
+    zl.lockAsync(lw, "test1".getBytes());
+    
+    lw.waitForChanges(1);
+
+    Assert.assertTrue(lw.locked);
+    Assert.assertTrue(zl.isLocked());
+    Assert.assertNull(lw.exception);
+    Assert.assertNull(lw.reason);
+    
+    // would not expect data to be set on this node either
+    zk.setData(zl.getLockPath(), "bar".getBytes(), -1);
+    
+    zk.delete(zl.getLockPath(), -1);
+
+    lw.waitForChanges(2);
+
+    Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+    Assert.assertNull(lw.exception);
+  }
+
+  @Test(timeout = 10000)
+  public void testTryLock() throws Exception {
+    
+    String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+    
+    ZooLock zl = new ZooLock(accumulo.getZookeepers(), 1000, "digest", "secret".getBytes(), parent);
+
+    ZooKeeper zk = new ZooKeeper(accumulo.getZookeepers(), 1000, null);
+    zk.addAuthInfo("digest", "secret".getBytes());
+    
+    for (int i = 0; i < 10; i++) {
+      zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+      zk.delete(parent, -1);
+    }
+    
+    zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+    TestALW lw = new TestALW();
+    
+    boolean ret = zl.tryLock(lw, "test1".getBytes());
+    
+    Assert.assertTrue(ret);
+    
+    // make sure still watching parent even though a lot of events occurred for the parent
+    Field field = zl.getClass().getDeclaredField("watchingParent");
+    field.setAccessible(true);
+    Assert.assertTrue((Boolean) field.get(zl));
+  }
+
+  @AfterClass
+  public static void tearDownMiniCluster() throws Exception {
+    accumulo.stop();
+    folder.delete();
+  }
+
+}