You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2013/03/01 21:17:40 UTC
svn commit: r1451708 - in /accumulo/branches/1.4: ./ src/ src/core/
src/server/ src/server/src/
src/server/src/main/java/org/apache/accumulo/server/gc/
src/server/src/main/java/org/apache/accumulo/server/master/
src/server/src/main/java/org/apache/accu...
Author: kturner
Date: Fri Mar 1 20:17:39 2013
New Revision: 1451708
URL: http://svn.apache.org/r1451708
Log:
merged some bug fixes from 1.5:
ACCUMULO-1125 delete distributed work queue task lock when task fails
ACCUMULO-1049 modified master to stop locking tserver nodes and just monitor tserver nodes in zookeeper
ACCUMULO-954 made zoolock report when its no longer able to monitor lock node and there does not know the status of the lock
ACCUMULO-954 Made zoolock rewatch its parent node and added some unit test for zoolock
Modified:
accumulo/branches/1.4/ (props changed)
accumulo/branches/1.4/src/ (props changed)
accumulo/branches/1.4/src/core/ (props changed)
accumulo/branches/1.4/src/server/ (props changed)
accumulo/branches/1.4/src/server/src/ (props changed)
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ (props changed)
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java (contents, props changed)
Propchange: accumulo/branches/1.4/
------------------------------------------------------------------------------
Merged /accumulo/trunk:r1442429,1443790,1444259
Merged /accumulo/branches/1.5:r1451015
Propchange: accumulo/branches/1.4/src/
------------------------------------------------------------------------------
Merged /accumulo/trunk/src:r1442429,1443790,1444259
Merged /accumulo/trunk:r1443790,1444259
Merged /accumulo/branches/1.5:r1451015
Merged /accumulo/branches/1.5/src:r1451015
Propchange: accumulo/branches/1.4/src/core/
------------------------------------------------------------------------------
Merged /accumulo/trunk/src/core:r1442429,1443790,1444259
Merged /accumulo/trunk/core:r1443790,1444259
Merged /accumulo/branches/1.5/core:r1451015
Propchange: accumulo/branches/1.4/src/server/
------------------------------------------------------------------------------
Merged /accumulo/trunk/src/server:r1442429,1443790,1444259
Merged /accumulo/branches/1.5/server:r1451015
Merged /accumulo/trunk/server:r1443790,1444259
Propchange: accumulo/branches/1.4/src/server/src/
------------------------------------------------------------------------------
Merged /accumulo/branches/1.5/server/src:r1451015
Merged /accumulo/trunk/server/src:r1443790,1444259
Merged /accumulo/trunk/src/server/src:r1442429,1443790,1444259
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/gc/SimpleGarbageCollector.java Fri Mar 1 20:17:39 2013
@@ -369,6 +369,18 @@ public class SimpleGarbageCollector impl
public void lostLock(LockLossReason reason) {
Halt.halt("GC lock in zookeeper lost (reason = " + reason + "), exiting!");
}
+
+ @Override
+ public void unableToMonitorLockNode(final Throwable e) {
+ Halt.halt(-1, new Runnable() {
+
+ @Override
+ public void run() {
+ log.fatal("No longer able to monitor lock node ", e);
+ }
+ });
+
+ }
};
while (true) {
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java Fri Mar 1 20:17:39 2013
@@ -50,6 +50,8 @@ import org.apache.hadoop.io.Text;
import org.apache.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.KeeperException.NotEmptyException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.Stat;
@@ -191,20 +193,12 @@ public class LiveTServerSet implements W
}
static class TServerInfo {
- ZooLock lock;
TServerConnection connection;
TServerInstance instance;
- TServerLockWatcher watcher;
- TServerInfo(ZooLock lock, TServerInstance instance, TServerConnection connection, TServerLockWatcher watcher) {
- this.lock = lock;
+ TServerInfo(TServerInstance instance, TServerConnection connection) {
this.connection = connection;
this.instance = instance;
- this.watcher = watcher;
- }
-
- void cleanup() throws InterruptedException, KeeperException {
- lock.tryToCancelAsyncLockOrUnlock();
}
};
@@ -230,7 +224,7 @@ public class LiveTServerSet implements W
public void run() {
scanServers();
}
- }, 0, 1000);
+ }, 0, 5000);
}
public synchronized void scanServers() {
@@ -239,55 +233,14 @@ public class LiveTServerSet implements W
final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
- for (String server : getZooCache().getChildren(path)) {
- // See if we have an async lock in place?
- TServerInfo info = current.get(server);
- TServerLockWatcher watcher;
- ZooLock lock;
- final String lockPath = path + "/" + server;
- if (info != null) {
- // yep: get out the lock/watcher so we can check on it
- watcher = info.watcher;
- lock = info.lock;
- } else {
- // nope: create a new lock and watcher
- lock = new ZooLock(lockPath);
- watcher = new TServerLockWatcher();
- lock.lockAsync(watcher, "master".getBytes());
- }
- TServerInstance instance = null;
- // Did we win the lock yet?
- if (!lock.isLocked() && !watcher.gotLock && watcher.failureException == null) {
- // Nope... there's a server out there: is this is a new server?
- if (info == null) {
- // Yep: hold onto the information about this server
- Stat stat = new Stat();
- byte[] lockData = ZooLock.getLockData(lockPath, stat);
- String lockString = new String(lockData == null ? new byte[] {} : lockData);
- if (lockString.length() > 0 && !lockString.equals("master")) {
- ServerServices services = new ServerServices(new String(lockData));
- InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
- InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
- TServerConnection conn = new TServerConnection(addr);
- instance = new TServerInstance(client, stat.getEphemeralOwner());
- info = new TServerInfo(lock, instance, conn, watcher);
- current.put(server, info);
- updates.add(instance);
- } else {
- lock.tryToCancelAsyncLockOrUnlock();
- }
- }
- } else {
- // Yes... there is no server here any more
- lock.tryToCancelAsyncLockOrUnlock();
- if (info != null) {
- doomed.add(info.instance);
- current.remove(server);
- info.cleanup();
- }
- ZooReaderWriter.getInstance().delete(lockPath, -1);
- }
+
+ HashSet<String> all = new HashSet<String>(current.keySet());
+ all.addAll(getZooCache().getChildren(path));
+
+ for (String server : all) {
+ checkServer(updates, doomed, path, server);
}
+
// log.debug("Current: " + current.keySet());
if (!doomed.isEmpty() || !updates.isEmpty())
this.cback.update(this, doomed, updates);
@@ -295,10 +248,82 @@ public class LiveTServerSet implements W
log.error(ex, ex);
}
}
+
+ private void deleteServerNode(String serverNode) throws InterruptedException, KeeperException {
+ try {
+ ZooReaderWriter.getInstance().delete(serverNode, -1);
+ } catch (NotEmptyException ex) {
+ // race condition: tserver created the lock after our last check; we'll see it at the next check
+ } catch (NoNodeException nne) {
+ // someone else deleted it
+ }
+ }
+ private synchronized void checkServer(final Set<TServerInstance> updates, final Set<TServerInstance> doomed, final String path, final String server)
+ throws TException, InterruptedException, KeeperException {
+
+ TServerInfo info = current.get(server);
+
+ final String lockPath = path + "/" + server;
+ Stat stat = new Stat();
+ byte[] lockData = ZooLock.getLockData(getZooCache(), lockPath, stat);
+
+ if (lockData == null) {
+ if (info != null) {
+ doomed.add(info.instance);
+ current.remove(server);
+ }
+
+ deleteServerNode(path + "/" + server);
+ } else {
+ ServerServices services = new ServerServices(new String(lockData));
+ InetSocketAddress client = services.getAddress(ServerServices.Service.TSERV_CLIENT);
+ InetSocketAddress addr = AddressUtil.parseAddress(server, Property.TSERV_CLIENTPORT);
+ TServerInstance instance = new TServerInstance(client, stat.getEphemeralOwner());
+
+ if (info == null) {
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+ } else if (!info.instance.equals(instance)) {
+ doomed.add(info.instance);
+ updates.add(instance);
+ current.put(server, new TServerInfo(instance, new TServerConnection(addr)));
+ }
+ }
+ }
+
@Override
public void process(WatchedEvent event) {
- scanServers();
+
+ // its imporant that these event are propogated by ZooCache, because this ensures when reading zoocache that is has already processed the event and cleared
+ // relevant nodes before code below reads from zoocache
+
+ if (event.getPath() != null) {
+ if (event.getPath().endsWith(Constants.ZTSERVERS)) {
+ scanServers();
+ } else if (event.getPath().contains(Constants.ZTSERVERS)) {
+ int pos = event.getPath().lastIndexOf('/');
+
+ // do only if ZTSERVER is parent
+ if (pos >= 0 && event.getPath().substring(0, pos).endsWith(Constants.ZTSERVERS)) {
+
+ String server = event.getPath().substring(pos + 1);
+
+ final Set<TServerInstance> updates = new HashSet<TServerInstance>();
+ final Set<TServerInstance> doomed = new HashSet<TServerInstance>();
+
+ final String path = ZooUtil.getRoot(instance) + Constants.ZTSERVERS;
+
+ try {
+ checkServer(updates, doomed, path, server);
+ if (!doomed.isEmpty() || !updates.isEmpty())
+ this.cback.update(this, doomed, updates);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ }
+ }
+ }
+ }
}
public synchronized TServerConnection getConnection(TServerInstance server) throws TException {
@@ -343,14 +368,8 @@ public class LiveTServerSet implements W
}
public synchronized void remove(TServerInstance server) {
- TServerInfo remove = current.remove(server.hostPort());
- if (remove != null) {
- try {
- remove.cleanup();
- } catch (Exception e) {
- log.info("error cleaning up connection to server", e);
- }
- }
+ current.remove(server.hostPort());
+
log.info("Removing zookeeper lock for " + server);
String zpath = ZooUtil.getRoot(instance) + Constants.ZTSERVERS + "/" + server.hostPort();
try {
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Fri Mar 1 20:17:39 2013
@@ -2144,6 +2144,17 @@ public class Master implements LiveTServ
public void lostLock(LockLossReason reason) {
Halt.halt("Master lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}
+
+ @Override
+ public void unableToMonitorLockNode(final Throwable e) {
+ Halt.halt(-1, new Runnable() {
+ @Override
+ public void run() {
+ log.fatal("No longer able to monitor master lock node", e);
+ }
+ });
+
+ }
};
long current = System.currentTimeMillis();
final long waitTime = ServerConfiguration.getSystemConfiguration().getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT);
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/TServerLockWatcher.java Fri Mar 1 20:17:39 2013
@@ -40,4 +40,10 @@ class TServerLockWatcher implements Asyn
@Override
public void lostLock(LockLossReason reason) {}
-}
\ No newline at end of file
+ @Override
+ public void unableToMonitorLockNode(Throwable e) {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java Fri Mar 1 20:17:39 2013
@@ -2652,6 +2652,17 @@ public class TabletServer extends Abstra
}
});
}
+
+ @Override
+ public void unableToMonitorLockNode(final Throwable e) {
+ Halt.halt(0, new Runnable() {
+ @Override
+ public void run() {
+ log.fatal("Lost ability to monitor tablet server lock, exiting.", e);
+ }
+ });
+
+ }
};
byte[] lockContent = new ServerServices(getClientAddressString(), Service.TSERV_CLIENT).toString().getBytes();
Propchange: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar 1 20:17:39 2013
@@ -0,0 +1,9 @@
+/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/test/functional:1309369,1328076,1330246,1330264,1349971,1354669
+/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional:1356400
+/accumulo/branches/1.4.2/src/server/src/main/java/org/apache/accumulo/server/test/functional:1399210,1402681
+/accumulo/branches/ACCUMULO-672/server/src/main/java/org/apache/accumulo/server/test/functional:1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/server/test/functional:1433477
+/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional:1329425,1332224,1332278,1332347,1333047,1333070,1337210,1341000,1342452,1344302,1344358,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398090,1423994,1424036,1424115,1426302,1437054,1443790,1444075,1444259,1444826,1445876,1446290,1446314
+/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/test/functional:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398290,1415914,1423994,1424036,1424050,1424060,1424099,1424115,1426302,1437054,1442429,1443790,1444075,1444259,1444826,1445876,1446314
+/accumulo/trunk/test/src/main/java/org/apache/accumulo/test/functional:1443790
+/incubator/accumulo/branches/1.4.0rc/src/server/src/main/java/org/apache/accumulo/server/test/functional:1304025,1305326
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/SplitRecoveryTest.java Fri Mar 1 20:17:39 2013
@@ -88,6 +88,11 @@ public class SplitRecoveryTest extends F
System.exit(-1);
}
+
+ @Override
+ public void unableToMonitorLockNode(Throwable e) {
+ System.exit(-1);
+ }
}, "foo".getBytes());
if (!gotLock) {
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/test/functional/ZombieTServer.java Fri Mar 1 20:17:39 2013
@@ -21,6 +21,7 @@ import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Random;
+import org.apache.accumulo.cloudtrace.instrument.Tracer;
import org.apache.accumulo.cloudtrace.thrift.TInfo;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.master.thrift.TableInfo;
@@ -115,6 +116,16 @@ public class ZombieTServer {
System.exit(1);
}
}
+
+ @Override
+ public void unableToMonitorLockNode(Throwable e) {
+ try {
+ tch.halt(Tracer.traceInfo(), null, null);
+ } catch (Exception ex) {
+ log.error(ex, ex);
+ System.exit(1);
+ }
+ }
};
byte[] lockContent = new ServerServices(addressString, Service.TSERV_CLIENT).toString().getBytes();
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java Fri Mar 1 20:17:39 2013
@@ -107,16 +107,16 @@ public class DistributedWorkQueue {
log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
}
- try {
- zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
- } catch (Exception e) {
- log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
- }
-
} catch (Exception e) {
log.warn("Failed to process work " + child, e);
}
+ try {
+ zoo.recursiveDelete(lockPath, NodeMissingPolicy.SKIP);
+ } catch (Exception e) {
+ log.error("Error received when trying to delete entry in zookeeper " + childPath, e);
+ }
+
} finally {
numTask.decrementAndGet();
}
Modified: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
URL: http://svn.apache.org/viewvc/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java?rev=1451708&r1=1451707&r2=1451708&view=diff
==============================================================================
--- accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java (original)
+++ accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java Fri Mar 1 20:17:39 2013
@@ -43,6 +43,13 @@ public class ZooLock implements Watcher
public interface LockWatcher {
void lostLock(LockLossReason reason);
+
+ /**
+ * lost the ability to monitor the lock node, and its status is unknown
+ *
+ * @param e
+ */
+ void unableToMonitorLockNode(Throwable e);
}
public interface AsyncLockWatcher extends LockWatcher {
@@ -56,7 +63,7 @@ public class ZooLock implements Watcher
final private IZooReaderWriter zooKeeper;
private String lock;
private LockWatcher lockWatcher;
-
+ private boolean watchingParent = false;
private String asyncLock;
public ZooLock(String path) {
@@ -64,8 +71,10 @@ public class ZooLock implements Watcher
zooKeeper = ZooReaderWriter.getInstance();
try {
zooKeeper.getStatus(path, this);
+ watchingParent = true;
} catch (Exception ex) {
log.warn("Error getting setting initial watch on ZooLock", ex);
+ throw new RuntimeException(ex);
}
}
@@ -91,6 +100,11 @@ public class ZooLock implements Watcher
lw.lostLock(reason);
}
+ @Override
+ public void unableToMonitorLockNode(Throwable e) {
+ lw.unableToMonitorLockNode(e);
+ }
+
}
public synchronized boolean tryLock(LockWatcher lw, byte data[]) throws KeeperException, InterruptedException {
@@ -126,6 +140,9 @@ public class ZooLock implements Watcher
Collections.sort(children);
if (children.get(0).equals(myLock)) {
+ if (!watchingParent) {
+ throw new IllegalStateException("Can not acquire lock, no longer watching parent : " + path);
+ }
this.lockWatcher = lw;
this.lock = myLock;
asyncLock = null;
@@ -165,7 +182,7 @@ public class ZooLock implements Watcher
}
}
}
-
+
if (event.getState() == KeeperState.Expired) {
synchronized (ZooLock.this) {
if (lock == null) {
@@ -181,6 +198,14 @@ public class ZooLock implements Watcher
lockAsync(myLock, lw);
}
+ private void lostLock(LockLossReason reason) {
+ LockWatcher localLw = lockWatcher;
+ lock = null;
+ lockWatcher = null;
+
+ localLw.lostLock(reason);
+ }
+
public synchronized void lockAsync(final AsyncLockWatcher lw, byte data[]) {
if (lockWatcher != null || lock != null || asyncLock != null) {
@@ -190,22 +215,37 @@ public class ZooLock implements Watcher
lockWasAcquired = false;
try {
- String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
+ final String asyncLockPath = zooKeeper.putEphemeralSequential(path + "/" + LOCK_PREFIX, data);
Stat stat = zooKeeper.getStatus(asyncLockPath, new Watcher() {
+
+ private void failedToAcquireLock(){
+ lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
+ asyncLock = null;
+ }
+
public void process(WatchedEvent event) {
synchronized (ZooLock.this) {
if (lock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + lock)) {
- LockWatcher localLw = lockWatcher;
- lock = null;
- lockWatcher = null;
-
- localLw.lostLock(LockLossReason.LOCK_DELETED);
-
+ lostLock(LockLossReason.LOCK_DELETED);
} else if (asyncLock != null && event.getType() == EventType.NodeDeleted && event.getPath().equals(path + "/" + asyncLock)) {
- lw.failedToAcquireLock(new Exception("Lock deleted before acquired"));
- asyncLock = null;
+ failedToAcquireLock();
+ } else if (event.getState() != KeeperState.Expired && (lock != null || asyncLock != null)) {
+ log.debug("Unexpected event watching lock node "+event+" "+asyncLockPath);
+ try {
+ Stat stat2 = zooKeeper.getStatus(asyncLockPath, this);
+ if(stat2 == null){
+ if(lock != null)
+ lostLock(LockLossReason.LOCK_DELETED);
+ else if(asyncLock != null)
+ failedToAcquireLock();
+ }
+ } catch (Throwable e) {
+ lockWatcher.unableToMonitorLockNode(e);
+ log.error("Failed to stat lock node " + asyncLockPath, e);
+ }
}
+
}
}
});
@@ -293,12 +333,26 @@ public class ZooLock implements Watcher
public synchronized void process(WatchedEvent event) {
log.debug("event " + event.getPath() + " " + event.getType() + " " + event.getState());
+ watchingParent = false;
+
if (event.getState() == KeeperState.Expired && lock != null) {
- LockWatcher localLw = lockWatcher;
- lock = null;
- lockWatcher = null;
- localLw.lostLock(LockLossReason.SESSION_EXPIRED);
+ if (lock != null) {
+ lostLock(LockLossReason.SESSION_EXPIRED);
+ }
+ } else {
+
+ try { // set the watch on the parent node again
+ zooKeeper.getStatus(path, this);
+ watchingParent = true;
+ } catch (Exception ex) {
+ if (lock != null || asyncLock != null) {
+ lockWatcher.unableToMonitorLockNode(ex);
+ log.error("Error resetting watch on ZooLock " + lock == null ? asyncLock : lock + " " + event, ex);
+ }
+ }
+
}
+
}
public static boolean isLockHeld(ZooKeeper zk, LockID lid) throws KeeperException, InterruptedException {
@@ -455,34 +509,4 @@ public class ZooLock implements Watcher
return false;
}
- public static void main(String[] args) throws Exception {
- String node = "/test/lock1";
- ZooLock zl = new ZooLock(node);
-
- zl.lockAsync(new AsyncLockWatcher() {
-
- @Override
- public void acquiredLock() {
- System.out.println("I got the lock");
- }
-
- @Override
- public void lostLock(LockLossReason reason) {
- System.out.println("OMG I lost my lock, reason = " + reason);
-
- }
-
- @Override
- public void failedToAcquireLock(Exception e) {
- System.out.println("Failed to acquire lock ");
- e.printStackTrace();
- }
-
- }, new byte[0]);
-
- while (true) {
- Thread.sleep(1000);
- }
- }
-
}
Propchange: accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Mar 1 20:17:39 2013
@@ -0,0 +1,10 @@
+/accumulo/branches/1.3/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1309369,1328076,1330246,1330264,1349971,1354669
+/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1356400
+/accumulo/branches/1.4.2/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1399210,1402681
+/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1451015
+/accumulo/branches/ACCUMULO-672/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1359163
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooLock.java:1442429,1443790
+/accumulo/trunk/fate/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1433477
+/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1329425,1332224,1332278,1332347,1333047,1333070,1337210,1341000,1342452,1344302,1344358,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398090,1423994,1424036,1424115,1426302,1437054,1443790,1444075,1444259,1444826,1445876,1446290,1446314
+/accumulo/trunk/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1329425,1332224,1332278,1332347,1333047,1333070,1341000,1342373,1350779,1351691,1356400,1359721,1397746,1397928,1397975,1397990,1398290,1415914,1423994,1424036,1424050,1424060,1424099,1424115,1426302,1437054,1442429,1443790,1444075,1444259,1444826,1445876,1446314
+/incubator/accumulo/branches/1.4.0rc/src/server/src/main/java/org/apache/accumulo/server/zookeeper/ZooLock.java:1304025,1305326