You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/02/05 00:13:10 UTC
svn commit: r1442429 - in /accumulo/trunk:
fate/src/main/java/org/apache/accumulo/fate/zookeeper/
test/src/test/java/org/apache/accumulo/fate/
test/src/test/java/org/apache/accumulo/fate/zookeeper/
Author: kturner
Date: Mon Feb 4 23:13:10 2013
New Revision: 1442429
URL: http://svn.apache.org/viewvc?rev=1442429&view=rev
Log:
ACCUMULO-954 Made zoolock rewatch its parent node and added some unit test for zoolock
Added:
accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/
accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/
accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
Modified:
accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
Modified: accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java?rev=1442429&r1=1442428&r2=1442429&view=diff
==============================================================================
--- accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java (original)
+++ accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java Mon Feb 4 23:13:10 2013
@@ -38,7 +38,7 @@ public class ZooLock implements Watcher
public static final String LOCK_PREFIX = "zlock-";
public enum LockLossReason {
- LOCK_DELETED, SESSION_EXPIRED
+ LOCK_DELETED, SESSION_EXPIRED, UNKNOWN
}
public interface LockWatcher {
@@ -56,7 +56,7 @@ public class ZooLock implements Watcher
protected final IZooReaderWriter zooKeeper;
private String lock;
private LockWatcher lockWatcher;
-
+ private boolean watchingParent = false;
private String asyncLock;
public ZooLock(String zookeepers, int timeInMillis, String scheme, byte[] auth, String path) {
@@ -69,8 +69,10 @@ public class ZooLock implements Watcher
zooKeeper = zrw;
try {
zooKeeper.getStatus(path, this);
+ watchingParent = true;
} catch (Exception ex) {
log.warn("Error getting setting initial watch on ZooLock", ex);
+ throw new RuntimeException(ex);
}
}
@@ -131,6 +133,8 @@ public class ZooLock implements Watcher
Collections.sort(children);
if (children.get(0).equals(myLock)) {
+ if (!watchingParent)
+ throw new RuntimeException("Can not acquire lock, no longer watching parent");
this.lockWatcher = lw;
this.lock = myLock;
asyncLock = null;
@@ -170,7 +174,7 @@ public class ZooLock implements Watcher
}
}
}
-
+
if (event.getState() == KeeperState.Expired) {
synchronized (ZooLock.this) {
if (lock == null) {
@@ -186,6 +190,14 @@ public class ZooLock implements Watcher
lockAsync(myLock, lw);
}
+ private void lostLock(LockLossReason reason) {
+ LockWatcher localLw = lockWatcher;
+ lock = null;
+ lockWatcher = null;
+
+ localLw.lostLock(reason);
+ }
+
public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
if (lockWatcher != null || lock != null || asyncLock != null) {
@@ -195,22 +207,48 @@ public class ZooLock implements Watcher
lockWasAcquired = false;
try {
- String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+ final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+
+ private void failedToAcquireLock(){
+ lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+ asyncLock = null;
+ }
+
public void process(WatchedEvent event) {
synchronized (ZooLock.this) {
if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
- LockWatcher localLw = lockWatcher;
- lock = null;
- lockWatcher = null;
-
- localLw.lostLock(LockLossReason.LOCK_DELETED);
-
+ lostLock(LockLossReason.LOCK_DELETED);
} else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
- lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
- asyncLock = null;
+ failedToAcquireLock();
+ } else if(event.getState() != KeeperState.Expired) {
+ log.warn("Unexpected event wathcing lock node "+event+" "+asyncLockPath);
+ try {
+ Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+ if(stat2 == null){
+ if(lock != null)
+ lostLock(LockLossReason.LOCK_DELETED);
+ else if(asyncLock != null)
+ failedToAcquireLock();
+ }
+ } catch (Exception e) {
+ log.error("Failed to stat lock node " + asyncLockPath, e);
+
+ try {
+ // not sure what happened... try to clean lock node up....
+ zooKeeper.delete(asyncLockPath, -1);
+ } catch (Throwable e2) {
+ log.debug("Failed to clean up lock node " + asyncLockPath, e2);
+ }
+
+ if(lock != null)
+ lostLock(LockLossReason.UNKNOWN);
+ else if(asyncLock != null)
+ failedToAcquireLock();
+ }
}
+
}
}
});
@@ -298,12 +336,27 @@ public class ZooLock implements Watcher
public synchronized void process(WatchedEvent event) {
log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ watchingParent = false;
+
if (event.getState() == KeeperState.Expired && lock != null) {
- LockWatcher localLw = lockWatcher;
- lock = null;
- lockWatcher = null;
- localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+ if (lock != null) {
+ lostLock(LockLossReason.SESSION_EXPIRED);
+ }
+ } else {
+
+ try { // set the watch on the parent node again
+ zooKeeper.getStatus(path, this);
+ watchingParent = true;
+ } catch (Exception ex) {
+ log.warn("Error resetting watch on ZooLock", ex);
+
+ if (lock != null) {
+ lostLock(LockLossReason.UNKNOWN);
+ }
+ }
+
}
+
}
public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
Added: accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java?rev=1442429&view=auto
==============================================================================
--- accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java (added)
+++ accumulo/trunk/test/src/test/java/org/apache/accumulo/fate/zookeeper/ZooLockTest.java Mon Feb 4 23:13:10 2013
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.fate.zookeeper;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.accumulo.fate.zookeeper.ZooLock.AsyncLockWatcher;
+import org.apache.accumulo.fate.zookeeper.ZooLock.LockLossReason;
+import org.apache.accumulo.test.MiniAccumuloCluster;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ *
+ */
+public class ZooLockTest {
+
+ public static TemporaryFolder folder = new TemporaryFolder();
+
+ private static MiniAccumuloCluster accumulo;
+
+ static class TestALW implements AsyncLockWatcher {
+
+ LockLossReason reason = null;
+ boolean locked = false;
+ Exception exception = null;
+ int changes = 0;
+
+ @Override
+ public synchronized void lostLock(LockLossReason reason) {
+ this.reason = reason;
+ changes++;
+ this.notifyAll();
+ }
+
+ @Override
+ public synchronized void acquiredLock() {
+ this.locked = true;
+ changes++;
+ this.notifyAll();
+ }
+
+ @Override
+ public synchronized void failedToAcquireLock(Exception e) {
+ this.exception = e;
+ changes++;
+ this.notifyAll();
+ }
+
+ public synchronized void waitForChanges(int numExpected) throws InterruptedException {
+ while (changes < numExpected) {
+ this.wait();
+ }
+ }
+ }
+
+ @BeforeClass
+ public static void setupMiniCluster() throws Exception {
+
+ folder.create();
+
+ Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARN);
+
+ accumulo = new MiniAccumuloCluster(folder.getRoot(), "superSecret");
+
+ accumulo.start();
+
+ }
+
+ private static int pdCount = 0;
+
+ @Test(timeout = 10000)
+ public void testDeleteParent() throws Exception {
+ accumulo.getZookeepers();
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ Assert.assertFalse(zl.isLocked());
+
+ ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+
+ // intentionally created parent after lock
+ zk.mkdirs(parent);
+
+ zk.delete(parent, -1);
+
+ zk.mkdirs(parent);
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+
+ lw.waitForChanges(1);
+
+ Assert.assertTrue(lw.locked);
+ Assert.assertTrue(zl.isLocked());
+ Assert.assertNull(lw.exception);
+ Assert.assertNull(lw.reason);
+ }
+
+ @Test(timeout = 10000)
+ public void testNoParent() throws Exception {
+ accumulo.getZookeepers();
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ Assert.assertFalse(zl.isLocked());
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+
+ lw.waitForChanges(1);
+
+ Assert.assertFalse(lw.locked);
+ Assert.assertFalse(zl.isLocked());
+ Assert.assertNotNull(lw.exception);
+ Assert.assertNull(lw.reason);
+ }
+
+ @Test(timeout = 10000)
+ public void testDeleteLock() throws Exception {
+ accumulo.getZookeepers();
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+ ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+ zk.mkdirs(parent);
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ Assert.assertFalse(zl.isLocked());
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+
+ lw.waitForChanges(1);
+
+ Assert.assertTrue(lw.locked);
+ Assert.assertTrue(zl.isLocked());
+ Assert.assertNull(lw.exception);
+ Assert.assertNull(lw.reason);
+
+ zk.delete(zl.getLockPath(), -1);
+
+ lw.waitForChanges(2);
+
+ Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+ Assert.assertNull(lw.exception);
+
+ }
+
+ @Test(timeout = 10000)
+ public void testDeleteWaiting() throws Exception {
+ accumulo.getZookeepers();
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+ ZooReaderWriter zk = ZooReaderWriter.getInstance(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes());
+ zk.mkdirs(parent);
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ Assert.assertFalse(zl.isLocked());
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+
+ lw.waitForChanges(1);
+
+ Assert.assertTrue(lw.locked);
+ Assert.assertTrue(zl.isLocked());
+ Assert.assertNull(lw.exception);
+ Assert.assertNull(lw.reason);
+
+
+ ZooLock zl2 = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ TestALW lw2 = new TestALW();
+
+ zl2.lockAsync(lw2, "test2".getBytes());
+
+ Assert.assertFalse(lw2.locked);
+ Assert.assertFalse(zl2.isLocked());
+
+ ZooLock zl3 = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ TestALW lw3 = new TestALW();
+
+ zl3.lockAsync(lw3, "test3".getBytes());
+
+ List<String> children = zk.getChildren(parent);
+ Collections.sort(children);
+
+ zk.delete(parent + "/" + children.get(1), -1);
+
+ lw2.waitForChanges(1);
+
+ Assert.assertFalse(lw2.locked);
+ Assert.assertNotNull(lw2.exception);
+ Assert.assertNull(lw2.reason);
+
+ zk.delete(parent + "/" + children.get(0), -1);
+
+ lw.waitForChanges(2);
+
+ Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+ Assert.assertNull(lw.exception);
+
+ lw3.waitForChanges(1);
+
+ Assert.assertTrue(lw3.locked);
+ Assert.assertTrue(zl3.isLocked());
+ Assert.assertNull(lw3.exception);
+ Assert.assertNull(lw3.reason);
+
+ }
+
+ @Test(timeout = 10000)
+ public void testUnexpectedEvent() throws Exception {
+ accumulo.getZookeepers();
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+ ZooKeeper zk = new ZooKeeper(accumulo.getZookeepers(), 30000, null);
+ zk.addAuthInfo("digest", "secret".getBytes());
+
+ zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 30000, "digest", "secret".getBytes(), parent);
+
+ Assert.assertFalse(zl.isLocked());
+
+ // would not expect data to be set on this node, but it should not cause problems.....
+ zk.setData(parent, "foo".getBytes(), -1);
+
+ TestALW lw = new TestALW();
+
+ zl.lockAsync(lw, "test1".getBytes());
+
+ lw.waitForChanges(1);
+
+ Assert.assertTrue(lw.locked);
+ Assert.assertTrue(zl.isLocked());
+ Assert.assertNull(lw.exception);
+ Assert.assertNull(lw.reason);
+
+ // would not expect data to be set on this node either
+ zk.setData(zl.getLockPath(), "bar".getBytes(), -1);
+
+ zk.delete(zl.getLockPath(), -1);
+
+ lw.waitForChanges(2);
+
+ Assert.assertEquals(LockLossReason.LOCK_DELETED, lw.reason);
+ Assert.assertNull(lw.exception);
+ }
+
+ @Test(timeout = 10000)
+ public void testTryLock() throws Exception {
+
+ String parent = "/zltest-" + this.hashCode() + "-l" + pdCount++;
+
+ ZooLock zl = new ZooLock(accumulo.getZookeepers(), 1000, "digest", "secret".getBytes(), parent);
+
+ ZooKeeper zk = new ZooKeeper(accumulo.getZookeepers(), 1000, null);
+ zk.addAuthInfo("digest", "secret".getBytes());
+
+ for (int i = 0; i < 10; i++) {
+ zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.delete(parent, -1);
+ }
+
+ zk.create(parent, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ TestALW lw = new TestALW();
+
+ boolean ret = zl.tryLock(lw, "test1".getBytes());
+
+ Assert.assertTrue(ret);
+
+ // make sure still watching parent even though a lot of events occurred for the parent
+ Field field = zl.getClass().getDeclaredField("watchingParent");
+ field.setAccessible(true);
+ Assert.assertTrue((Boolean) field.get(zl));
+ }
+
+ @AfterClass
+ public static void tearDownMiniCluster() throws Exception {
+ accumulo.stop();
+ folder.delete();
+ }
+
+}