You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/10/22 01:49:45 UTC
svn commit: r706815 - in /hadoop/zookeeper/trunk: ./ src/
src/java/jmx/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/test/org/apache/zookeeper/test/
Author: mahadev
Date: Tue Oct 21 16:49:45 2008
New Revision: 706815
URL: http://svn.apache.org/viewvc?rev=706815&view=rev
Log:
ZOOKEEPER-43. Server side of auto reset watches.
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
hadoop/zookeeper/trunk/src/zookeeper.jute
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Tue Oct 21 16:49:45 2008
@@ -30,6 +30,8 @@
ZOOKEEPER-33. Better ACL management
(mahadev)
+
+ ZOOKEEPER-43. Server side of auto reset watches.
Backward compatibile changes:
Modified: hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/jmx/org/apache/zookeeper/server/ObservableDataTree.java Tue Oct 21 16:49:45 2008
@@ -87,9 +87,11 @@
return result;
}
- public void deleteNode(String path) throws KeeperException.NoNodeException {
+ public void deleteNode(String path, long zxid)
+ throws KeeperException.NoNodeException
+ {
DataNode deleted=getNode(path);
- super.deleteNode(path);
+ super.deleteNode(path, zxid);
ObserverManager.getInstance().notifyObservers(this,
new TreeEventInfo(Event.DELETE,deleted));
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Tue Oct 21 16:49:45 2008
@@ -60,6 +60,7 @@
import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.ZooTrace;
@@ -72,6 +73,19 @@
*/
public class ClientCnxn {
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
+
+ /** This controls whether automatic watch resetting is enabled.
+ * Clients automatically reset watches during session reconnect, this
+ * option allows the client to turn off this behavior by setting
+ * the environment variable "zookeeper.disableAutoWatchReset" to "true" */
+ public static boolean disableAutoWatchReset;
+ static {
+ // this var should not be public, but otw there is no easy way
+ // to test
+ disableAutoWatchReset =
+ Boolean.getBoolean("zookeeper.disableAutoWatchReset");
+ LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
+ }
private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
@@ -446,7 +460,7 @@
finishPacket(p);
}
- long lastZxid;
+ volatile long lastZxid;
/**
* This class services the outgoing request queue and generates the heart
@@ -479,7 +493,8 @@
if (sessionTimeout <= 0) {
zooKeeper.state = States.CLOSED;
- eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
+ eventThread.queueEvent(new WatchedEvent(
+ Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null));
throw new IOException("Session Expired");
}
@@ -489,6 +504,15 @@
sessionPasswd = conRsp.getPasswd();
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null));
+ if (!disableAutoWatchReset) {
+ SetWatches sw = new SetWatches(lastZxid,
+ zooKeeper.getDataWatches(),
+ zooKeeper.getExistWatches(),
+ zooKeeper.getChildWatches());
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.setWatches);
+ queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
+ }
}
void readResponse() throws IOException {
@@ -540,27 +564,31 @@
* Since requests are processed in order, we better get a response
* to the first request!
*/
- if (packet.header.getXid() != replyHdr.getXid()) {
- throw new IOException("Xid out of order. Got "
- + replyHdr.getXid() + " expected "
- + packet.header.getXid());
- }
-
- packet.replyHeader.setXid(replyHdr.getXid());
- packet.replyHeader.setErr(replyHdr.getErr());
- packet.replyHeader.setZxid(replyHdr.getZxid());
- lastZxid = replyHdr.getZxid();
- if (packet.response != null && replyHdr.getErr() == 0) {
- packet.response.deserialize(bbia, "response");
- }
- packet.finished = true;
+ try {
+ if (packet.header.getXid() != replyHdr.getXid()) {
+ packet.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
+ throw new IOException("Xid out of order. Got "
+ + replyHdr.getXid() + " expected "
+ + packet.header.getXid());
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Reading reply sessionid:0x"
- + Long.toHexString(sessionId) + ", packet:: " + packet);
- }
+ packet.replyHeader.setXid(replyHdr.getXid());
+ packet.replyHeader.setErr(replyHdr.getErr());
+ packet.replyHeader.setZxid(replyHdr.getZxid());
+ if (replyHdr.getZxid() > 0) {
+ lastZxid = replyHdr.getZxid();
+ }
+ if (packet.response != null && replyHdr.getErr() == 0) {
+ packet.response.deserialize(bbia, "response");
+ }
- finishPacket(packet);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading reply sessionid:0x"
+ + Long.toHexString(sessionId) + ", packet:: " + packet);
+ }
+ } finally {
+ finishPacket(packet);
+ }
}
/**
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooDefs.java Tue Oct 21 16:49:45 2008
@@ -49,6 +49,8 @@
public final int ping = 11;
public final int auth = 100;
+
+ public final int setWatches = 101;
public final int createSession = -10;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Tue Oct 21 16:49:45 2008
@@ -19,6 +19,7 @@
package org.apache.zookeeper;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -99,7 +100,20 @@
private final ZKWatchManager watchManager = new ZKWatchManager();
- /**
+ List<String> getDataWatches() {
+ List<String> rc = new ArrayList<String>(watchManager.dataWatches.keySet());
+ return rc;
+ }
+ List<String> getExistWatches() {
+ List<String> rc = new ArrayList<String>(watchManager.existWatches.keySet());
+ return rc;
+ }
+ List<String> getChildWatches() {
+ List<String> rc = new ArrayList<String>(watchManager.childWatches.keySet());
+ return rc;
+ }
+
+/**
* Manage watchers & handle events generated by the ClientCnxn object.
*
* We are implementing this as a nested class of ZooKeeper so that
@@ -109,11 +123,19 @@
private class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
+ private final Map<String, Set<Watcher>> existWatches =
+ new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
-
+
private volatile Watcher defaultWatcher;
+ final private void addTo(Set<Watcher> from, Set<Watcher> to) {
+ if (from != null) {
+ to.addAll(from);
+ }
+ }
+
/* (non-Javadoc)
* @see org.apache.zookeeper.ClientWatchManager#materialize(Event.KeeperState, Event.EventType, java.lang.String)
*/
@@ -121,58 +143,60 @@
Watcher.Event.EventType type, String path) {
Set<Watcher> result = new HashSet<Watcher>();
- // clear the watches if we are not connected
+ switch (type) {
+ case None:
+ result.add(defaultWatcher);
+ for(Set<Watcher> ws: dataWatches.values()) {
+ result.addAll(ws);
+ }
+ for(Set<Watcher> ws: existWatches.values()) {
+ result.addAll(ws);
+ }
+ for(Set<Watcher> ws: childWatches.values()) {
+ result.addAll(ws);
+ }
- if (state != Watcher.Event.KeeperState.SyncConnected) {
- synchronized (dataWatches) {
- for (Set<Watcher> watchers : dataWatches.values()) {
- for (Watcher watcher : watchers) {
- result.add(watcher);
- }
+ // clear the watches if auto watch reset is not enabled
+ if (ClientCnxn.disableAutoWatchReset &&
+ state != Watcher.Event.KeeperState.SyncConnected)
+ {
+ synchronized(dataWatches) {
+ dataWatches.clear();
}
- dataWatches.clear();
- }
- synchronized (childWatches) {
- for (Set<Watcher> watchers : childWatches.values()) {
- for (Watcher watcher : watchers) {
- result.add(watcher);
- }
+ synchronized(existWatches) {
+ existWatches.clear();
+ }
+ synchronized(childWatches) {
+ childWatches.clear();
}
- childWatches.clear();
}
- }
-
- Set<Watcher> watchers = null;
- switch (type) {
- case None:
- result.add(defaultWatcher);
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
- watchers = dataWatches.remove(path);
+ addTo(dataWatches.remove(path), result);
+ }
+ synchronized (existWatches) {
+ addTo(existWatches.remove(path), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
- watchers = childWatches.remove(path);
+ addTo(childWatches.remove(path), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
- watchers = dataWatches.remove(path);
+ addTo(dataWatches.remove(path), result);
}
- Set<Watcher> cwatches;
- synchronized (childWatches) {
- cwatches = childWatches.remove(path);
+ // XXX This shouldn't be needed, but just in case
+ synchronized (existWatches) {
+ addTo(existWatches.remove(path), result);
+ LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
- if (cwatches != null) {
- if (watchers == null) {
- watchers = cwatches;
- } else {
- watchers.addAll(cwatches);
- }
+ synchronized (childWatches) {
+ addTo(childWatches.remove(path), result);
}
break;
default:
@@ -182,26 +206,24 @@
throw new RuntimeException(msg);
}
- result.addAll(watchers);
return result;
}
}
-
+
/**
* Register a watcher for a particular path.
*/
- class WatchRegistration {
- private Map<String, Set<Watcher>> watches;
+ abstract class WatchRegistration {
private Watcher watcher;
private String path;
- public WatchRegistration(Map<String, Set<Watcher>> watches,
- Watcher watcher, String path)
+ public WatchRegistration(Watcher watcher, String path)
{
- this.watches = watches;
this.watcher = watcher;
this.path = path;
}
+ abstract protected Map<String, Set<Watcher>> getWatches(int rc);
+
/**
* Register the watcher with the set of watches on path.
* @param rc the result code of the operation that attempted to
@@ -209,6 +231,7 @@
*/
public void register(int rc) {
if (shouldAddWatch(rc)) {
+ Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(path);
if (watchers == null) {
@@ -234,17 +257,43 @@
* even in the case where NONODE result code is returned.
*/
class ExistsWatchRegistration extends WatchRegistration {
- public ExistsWatchRegistration(Map<String, Set<Watcher>> watches,
- Watcher watcher, String path)
- {
- super(watches, watcher, path);
+ public ExistsWatchRegistration(Watcher watcher, String path) {
+ super(watcher, path);
}
+
+ @Override
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
+ return rc == 0 ? watchManager.dataWatches : watchManager.existWatches;
+ }
+
@Override
protected boolean shouldAddWatch(int rc) {
return rc == 0 || rc == KeeperException.Code.NoNode;
}
}
+ class DataWatchRegistration extends WatchRegistration {
+ public DataWatchRegistration(Watcher watcher, String path) {
+ super(watcher, path);
+ }
+
+ @Override
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
+ return watchManager.dataWatches;
+ }
+ }
+
+ class ChildWatchRegistration extends WatchRegistration {
+ public ChildWatchRegistration(Watcher watcher, String path) {
+ super(watcher, path);
+ }
+
+ @Override
+ protected Map<String, Set<Watcher>> getWatches(int rc) {
+ return watchManager.childWatches;
+ }
+ }
+
public enum States {
CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
@@ -534,8 +583,7 @@
SetDataResponse response = new SetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
- path);
+ wcb = new ExistsWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
@@ -589,8 +637,7 @@
SetDataResponse response = new SetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new ExistsWatchRegistration(watchManager.dataWatches, watcher,
- path);
+ wcb = new ExistsWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
@@ -634,8 +681,7 @@
GetDataResponse response = new GetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new WatchRegistration(watchManager.dataWatches, watcher,
- path);
+ wcb = new DataWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
@@ -685,8 +731,7 @@
GetDataResponse response = new GetDataResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new WatchRegistration(watchManager.dataWatches, watcher,
- path);
+ wcb = new DataWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
@@ -899,8 +944,7 @@
GetChildrenResponse response = new GetChildrenResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new WatchRegistration(watchManager.childWatches, watcher,
- path);
+ wcb = new ChildWatchRegistration(watcher, path);
}
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
@@ -950,8 +994,7 @@
GetChildrenResponse response = new GetChildrenResponse();
WatchRegistration wcb = null;
if (watcher != null) {
- wcb = new WatchRegistration(watchManager.childWatches, watcher,
- path);
+ wcb = new ChildWatchRegistration(watcher, path);
}
cnxn.queuePacket(h, new ReplyHeader(), request, response, cb, path,
ctx, wcb);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Tue Oct 21 16:49:45 2008
@@ -83,13 +83,19 @@
}
private static void printStat(Stat stat) {
- System.err.println("ctime = " + new Date(stat.getCtime()).toString());
- System.err.println("ctime = " + new Date(stat.getMtime()).toString());
- System.err.println("cversion = " + stat.getCversion());
System.err.println("cZxid = " + stat.getCzxid());
+ System.err.println("ctime = " + new Date(stat.getCtime()).toString());
System.err.println("mZxid = " + stat.getMzxid());
+ System.err.println("mtime = " + new Date(stat.getMtime()).toString());
+ System.err.println("pZxid = " + stat.getPzxid());
+
+ System.err.println("cversion = " + stat.getCversion());
System.err.println("dataVersion = " + stat.getVersion());
System.err.println("aclVersion = " + stat.getAversion());
+
+ System.err.println("ephemeralOwner = " + stat.getEphemeralOwner());
+ System.err.println("dataLength = " + stat.getDataLength());
+ System.err.println("numChildren = " + stat.getNumChildren());
}
public static void main(String args[]) throws NumberFormatException,
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataNode.java Tue Oct 21 16:49:45 2008
@@ -93,6 +93,7 @@
to.setCzxid(stat.getCzxid());
to.setMtime(stat.getMtime());
to.setMzxid(stat.getMzxid());
+ to.setPzxid(stat.getPzxid());
to.setVersion(stat.getVersion());
to.setEphemeralOwner(stat.getEphemeralOwner());
to.setDataLength(data.length);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Tue Oct 21 16:49:45 2008
@@ -34,14 +34,17 @@
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.Watcher.Event.EventType;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.txn.CreateTxn;
import org.apache.zookeeper.txn.DeleteTxn;
import org.apache.zookeeper.txn.ErrorTxn;
@@ -229,6 +232,7 @@
to.setCzxid(from.getCzxid());
to.setMtime(from.getMtime());
to.setMzxid(from.getMzxid());
+ to.setPzxid(from.getPzxid());
to.setVersion(from.getVersion());
to.setEphemeralOwner(from.getEphemeralOwner());
}
@@ -240,6 +244,7 @@
to.setCzxid(from.getCzxid());
to.setMtime(from.getMtime());
to.setMzxid(from.getMzxid());
+ to.setPzxid(from.getPzxid());
to.setVersion(from.getVersion());
to.setEphemeralOwner(from.getEphemeralOwner());
to.setDataLength(from.getDataLength());
@@ -285,6 +290,7 @@
stat.setMtime(time);
stat.setCzxid(zxid);
stat.setMzxid(zxid);
+ stat.setPzxid(zxid);
stat.setVersion(0);
stat.setAversion(0);
stat.setEphemeralOwner(ephemeralOwner);
@@ -299,6 +305,7 @@
int cver = parent.stat.getCversion();
cver++;
parent.stat.setCversion(cver);
+ parent.stat.setPzxid(zxid);
Long longval = convertAcls(acl);
DataNode child = new DataNode(parent, data, longval, stat);
parent.children.add(childName);
@@ -321,10 +328,13 @@
/**
* remove the path from the datatree
- * @param path the path to be deleted
+ * @param path the path to of the node to be deleted
+ * @param zxid the current zxid
* @throws KeeperException.NoNodeException
*/
- public void deleteNode(String path) throws KeeperException.NoNodeException {
+ public void deleteNode(String path, long zxid)
+ throws KeeperException.NoNodeException
+ {
int lastSlash = path.lastIndexOf('/');
String parentName = path.substring(0, lastSlash);
String childName = path.substring(lastSlash + 1);
@@ -340,6 +350,7 @@
synchronized (parent) {
parent.children.remove(childName);
parent.stat.setCversion(parent.stat.getCversion() + 1);
+ parent.stat.setPzxid(zxid);
long eowner = node.stat.getEphemeralOwner();
if (eowner != 0) {
HashSet<String> nodes = ephemerals.get(eowner);
@@ -522,7 +533,7 @@
case OpCode.delete:
DeleteTxn deleteTxn = (DeleteTxn) txn;
debug = "Delete transaction for " + deleteTxn.getPath();
- deleteNode(deleteTxn.getPath());
+ deleteNode(deleteTxn.getPath(), header.getZxid());
break;
case OpCode.setData:
SetDataTxn setDataTxn = (SetDataTxn) txn;
@@ -537,7 +548,7 @@
setACLTxn.getVersion());
break;
case OpCode.closeSession:
- killSession(header.getClientId());
+ killSession(header.getClientId(), header.getZxid());
break;
case OpCode.error:
ErrorTxn errTxn = (ErrorTxn) txn;
@@ -555,7 +566,7 @@
return rc;
}
- void killSession(long session) {
+ void killSession(long session, long zxid) {
// the list is already removed from the ephemerals
// so we do not have to worry about synchronyzing on
// the list. This is only called from FinalRequestProcessor
@@ -566,7 +577,7 @@
if (list != null) {
for (String path : list) {
try {
- deleteNode(path);
+ deleteNode(path, zxid);
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK,
"Deleting ephemeral node "
@@ -727,4 +738,61 @@
// dataWatches = null;
// childWatches = null;
}
+
+ public void setWatches(long relativeZxid, List<String> dataWatches,
+ List<String> existWatches, List<String> childWatches, Watcher watcher) {
+ for(String path: dataWatches) {
+ DataNode node = getNode(path);
+ WatchedEvent e = null;
+ if (node == null) {
+ e = new WatchedEvent(EventType.NodeDeleted,
+ KeeperState.SyncConnected, path);
+ } else if (node.stat.getCzxid() > relativeZxid) {
+ e = new WatchedEvent(EventType.NodeCreated,
+ KeeperState.SyncConnected, path);
+ } else if (node.stat.getMzxid() > relativeZxid) {
+ e = new WatchedEvent(EventType.NodeDataChanged,
+ KeeperState.SyncConnected, path);
+ }
+ if (e != null) {
+ watcher.process(e);
+ } else {
+ this.dataWatches.addWatch(path, watcher);
+ }
+ }
+ for(String path: existWatches) {
+ DataNode node = getNode(path);
+ WatchedEvent e = null;
+ if (node == null) {
+ // This is the case when the watch was registered
+ } else if (node.stat.getMzxid() > relativeZxid) {
+ e = new WatchedEvent(EventType.NodeDataChanged,
+ KeeperState.SyncConnected, path);
+ } else {
+ e = new WatchedEvent(EventType.NodeCreated,
+ KeeperState.SyncConnected, path);
+ }
+ if (e != null) {
+ watcher.process(e);
+ } else {
+ this.dataWatches.addWatch(path, watcher);
+ }
+ }
+ for(String path: childWatches) {
+ DataNode node = getNode(path);
+ WatchedEvent e = null;
+ if (node == null) {
+ e = new WatchedEvent(EventType.NodeDeleted,
+ KeeperState.SyncConnected, path);
+ } else if (node.stat.getPzxid() > relativeZxid) {
+ e = new WatchedEvent(EventType.NodeChildrenChanged,
+ KeeperState.SyncConnected, path);
+ }
+ if (e != null) {
+ watcher.process(e);
+ } else {
+ this.childWatches.addWatch(path, watcher);
+ }
+ }
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Tue Oct 21 16:49:45 2008
@@ -43,6 +43,7 @@
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.SetACLResponse;
import org.apache.zookeeper.proto.SetDataResponse;
+import org.apache.zookeeper.proto.SetWatches;
import org.apache.zookeeper.proto.SyncRequest;
import org.apache.zookeeper.proto.SyncResponse;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -195,6 +196,17 @@
getDataRequest.getWatch() ? request.cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
+ case OpCode.setWatches:
+ SetWatches setWatches = new SetWatches();
+ // XXX We really should NOT need this!!!!
+ request.request.rewind();
+ ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
+ long relativeZxid = setWatches.getRelativeZxid();
+ zks.dataTree.setWatches(relativeZxid,
+ setWatches.getDataWatches(),
+ setWatches.getExistWatches(),
+ setWatches.getChildWatches(), request.cnxn);
+ break;
case OpCode.getACL:
GetACLRequest getACLRequest = new GetACLRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Tue Oct 21 16:49:45 2008
@@ -352,6 +352,7 @@
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.ping:
+ case OpCode.setWatches:
break;
}
} catch (KeeperException e) {
@@ -379,9 +380,7 @@
}
request.hdr = txnHeader;
request.txn = txn;
- if (request.hdr != null) {
- request.zxid = request.hdr.getZxid();
- }
+ request.zxid = zks.getZxid();
nextProcessor.processRequest(request);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Tue Oct 21 16:49:45 2008
@@ -99,6 +99,7 @@
case OpCode.getChildren:
case OpCode.ping:
case OpCode.closeSession:
+ case OpCode.setWatches:
return true;
default:
return false;
@@ -131,6 +132,8 @@
return "notification";
case OpCode.create:
return "create";
+ case OpCode.setWatches:
+ return "setWatches";
case OpCode.delete:
return "delete";
case OpCode.exists:
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Tue Oct 21 16:49:45 2008
@@ -191,7 +191,8 @@
}
dataTree.initialized = true;
for (long session : deadSessions) {
- killSession(session);
+ // XXX: Is lastProcessedZxid really the best thing to use?
+ killSession(session, dataTree.lastProcessedZxid);
}
// Make a clean snapshot
takeSnapshot();
@@ -284,8 +285,8 @@
submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
}
- protected void killSession(long sessionId) {
- dataTree.killSession(sessionId);
+ protected void killSession(long sessionId, long zxid) {
+ dataTree.killSession(sessionId, zxid);
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
"ZooKeeperServer --- killSession: 0x"
+ Long.toHexString(sessionId));
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Tue Oct 21 16:49:45 2008
@@ -27,6 +27,7 @@
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
@@ -70,11 +71,44 @@
}
protected static class CountdownWatcher implements Watcher {
+ // XXX this doesn't need to be volatile! (Should probably be final)
volatile CountDownLatch clientConnected = new CountDownLatch(1);
-
- public void process(WatchedEvent event) {
+ volatile boolean connected;
+ synchronized public void process(WatchedEvent event) {
if (event.getState() == KeeperState.SyncConnected) {
+ connected = true;
+ notifyAll();
clientConnected.countDown();
+ } else {
+ connected = false;
+ notifyAll();
+ }
+ }
+ synchronized boolean isConnected() {
+ return connected;
+ }
+ synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(!connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (!connected) {
+ throw new TimeoutException("Did not connect");
+
+ }
+ }
+ synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
+ long expire = System.currentTimeMillis() + timeout;
+ long left = timeout;
+ while(connected && left > 0) {
+ wait(left);
+ left = expire - System.currentTimeMillis();
+ }
+ if (connected) {
+ throw new TimeoutException("Did not disconnect");
+
}
}
}
@@ -95,7 +129,7 @@
protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
throws IOException, InterruptedException
{
- ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
+ ZooKeeper zk = new ZooKeeper(hp, 9000, watcher);
if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
@@ -264,6 +298,17 @@
LOG.info("Client test setup finished");
}
+ protected void stopServer() throws Exception {
+ LOG.info("STOPPING server");
+ shutdownServerInstance(serverFactory, hostPort);
+ serverFactory = null;
+ }
+
+ protected void startServer() throws Exception {
+ LOG.info("STARTING server");
+ serverFactory = createNewServerInstance(tmpDir, serverFactory, hostPort);
+ }
+
@Override
protected void tearDown() throws Exception {
LOG.info("tearDown starting");
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Tue Oct 21 16:49:45 2008
@@ -21,7 +21,6 @@
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.HashMap;
import org.apache.log4j.Logger;
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/RecoveryTest.java Tue Oct 21 16:49:45 2008
@@ -25,15 +25,15 @@
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
-import java.io.IOException;
+
import org.apache.log4j.Logger;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.SyncRequestProcessor;
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StatTest.java Tue Oct 21 16:49:45 2008
@@ -63,6 +63,7 @@
stat.setMtime(100);
stat.setMzxid(100);
stat.setNumChildren(100);
+ stat.setPzxid(100);
stat.setVersion(100);
return stat;
@@ -82,6 +83,7 @@
zk.getData(name, false, stat);
assertEquals(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid(), stat.getPzxid());
assertEquals(stat.getCtime(), stat.getMtime());
assertEquals(0, stat.getCversion());
assertEquals(0, stat.getVersion());
@@ -109,6 +111,7 @@
zk.getData(name, false, stat);
assertEquals(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid() + 1, stat.getPzxid());
assertEquals(stat.getCtime(), stat.getMtime());
assertEquals(1, stat.getCversion());
assertEquals(0, stat.getVersion());
@@ -121,6 +124,7 @@
zk.getData(childname, false, stat);
assertEquals(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid(), stat.getPzxid());
assertEquals(stat.getCtime(), stat.getMtime());
assertEquals(0, stat.getCversion());
assertEquals(0, stat.getVersion());
@@ -149,6 +153,7 @@
zk.getData(name, false, stat);
assertEquals(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid() + i + 1, stat.getPzxid());
assertEquals(stat.getCtime(), stat.getMtime());
assertEquals(i + 1, stat.getCversion());
assertEquals(0, stat.getVersion());
@@ -173,6 +178,7 @@
zk.getData(name, false, stat);
assertEquals(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid(), stat.getPzxid());
assertEquals(stat.getCtime(), stat.getMtime());
assertEquals(0, stat.getCversion());
assertEquals(0, stat.getVersion());
@@ -187,6 +193,7 @@
zk.getData(name, false, stat);
assertNotSame(stat.getCzxid(), stat.getMzxid());
+ assertEquals(stat.getCzxid(), stat.getPzxid());
assertNotSame(stat.getCtime(), stat.getMtime());
assertEquals(0, stat.getCversion());
assertEquals(1, stat.getVersion());
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java Tue Oct 21 16:49:45 2008
@@ -21,8 +21,10 @@
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
@@ -30,6 +32,7 @@
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
@@ -89,7 +92,6 @@
String name = zk.create("/tc-", "initialvalue".getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
names[i] = name;
- System.out.println(name);
Stat stat = new Stat();
zk.getData(name, watcher, stat);
@@ -116,4 +118,167 @@
}
}
+ @Test
+ public void testWatcherAutoResetWithGlobal() throws Exception {
+ ZooKeeper zk = null;
+ MyWatcher watcher = new MyWatcher();
+ zk = createClient(watcher, hostPort);
+ testWatcherAutoReset(zk, watcher, watcher);
+ zk.close();
+ }
+
+ @Test
+ public void testWatcherAutoResetWithLocal() throws Exception {
+ ZooKeeper zk = null;
+ MyWatcher watcher = new MyWatcher();
+ zk = createClient(watcher, hostPort);
+ testWatcherAutoReset(zk, watcher, new MyWatcher());
+ zk.close();
+ }
+
+ @Test
+ public void testWatcherAutoResetDisabledWithGlobal() throws Exception {
+ ClientCnxn.disableAutoWatchReset = true;
+ testWatcherAutoResetWithGlobal();
+ }
+
+ @Test
+ public void testWatcherAutoResetDisabledWithLocal() throws Exception {
+ ClientCnxn.disableAutoWatchReset = true;
+ testWatcherAutoResetWithLocal();
+ }
+
+ private void testWatcherAutoReset(ZooKeeper zk, MyWatcher globalWatcher,
+ MyWatcher localWatcher) throws Exception {
+ boolean isGlobal = (localWatcher == globalWatcher);
+ // First test to see if the watch survives across reconnects
+ zk.create("/watchtest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk.create("/watchtest/child", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ if (isGlobal) {
+ zk.getChildren("/watchtest", true);
+ zk.getData("/watchtest/child", true, new Stat());
+ zk.exists("/watchtest/child2", true);
+ } else {
+ zk.getChildren("/watchtest", localWatcher);
+ zk.getData("/watchtest/child", localWatcher, new Stat());
+ zk.exists("/watchtest/child2", localWatcher);
+ }
+
+ assertTrue(localWatcher.events.isEmpty());
+
+ stopServer();
+ globalWatcher.waitForDisconnected(3000);
+ localWatcher.waitForDisconnected(500);
+ startServer();
+ globalWatcher.waitForConnected(3000);
+ if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+ localWatcher.waitForConnected(500);
+ }
+
+ assertTrue(localWatcher.events.isEmpty());
+ zk.setData("/watchtest/child", new byte[1], -1);
+ zk.create("/watchtest/child2", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ WatchedEvent e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+ if (!ClientCnxn.disableAutoWatchReset) {
+ assertEquals(e.getPath(), EventType.NodeDataChanged, e.getType());
+ assertEquals("/watchtest/child", e.getPath());
+ } else {
+ assertNull("unexpected event", e);
+ }
+
+ e = localWatcher.events.poll(1000, TimeUnit.MILLISECONDS);
+ if (!ClientCnxn.disableAutoWatchReset) {
+ // The create will trigger the get children and the exist
+ // watches
+ assertEquals(EventType.NodeCreated, e.getType());
+ assertEquals("/watchtest/child2", e.getPath());
+ } else {
+ assertNull("unexpected event", e);
+ }
+
+ e = localWatcher.events.poll(1000, TimeUnit.MILLISECONDS);
+ if (!ClientCnxn.disableAutoWatchReset) {
+ assertEquals(EventType.NodeChildrenChanged, e.getType());
+ assertEquals("/watchtest", e.getPath());
+ } else {
+ assertNull("unexpected event", e);
+ }
+
+ // Make sure PINGs don't screw us up!
+ Thread.sleep(4000);
+
+ assertTrue(localWatcher.events.isEmpty()); // ensure no late arrivals
+ stopServer();
+ globalWatcher.waitForDisconnected(3000);
+ try {
+ try {
+ localWatcher.waitForDisconnected(500);
+ if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+ fail("Got an event when I shouldn't have");
+ }
+ } catch(TimeoutException toe) {
+ if (ClientCnxn.disableAutoWatchReset) {
+ fail("Didn't get an event when I should have");
+ }
+ // Else what we are expecting since there are no outstanding watches
+ }
+ } catch (Exception e1) {
+ LOG.error("bad", e1);
+ throw new RuntimeException(e1);
+ }
+ startServer();
+ globalWatcher.waitForConnected(3000);
+
+ if (isGlobal) {
+ zk.getChildren("/watchtest", true);
+ zk.getData("/watchtest/child", true, new Stat());
+ zk.exists("/watchtest/child2", true);
+ } else {
+ zk.getChildren("/watchtest", localWatcher);
+ zk.getData("/watchtest/child", localWatcher, new Stat());
+ zk.exists("/watchtest/child2", localWatcher);
+ }
+
+ // Do trigger an event to make sure that we do not get
+ // it later
+ zk.delete("/watchtest/child2", -1);
+
+ e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+ assertEquals(EventType.NodeDeleted, e.getType());
+ assertEquals("/watchtest/child2", e.getPath());
+
+ e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+ assertEquals(EventType.NodeChildrenChanged, e.getType());
+ assertEquals("/watchtest", e.getPath());
+
+ assertTrue(localWatcher.events.isEmpty());
+
+ stopServer();
+ globalWatcher.waitForDisconnected(3000);
+ localWatcher.waitForDisconnected(500);
+ startServer();
+ globalWatcher.waitForConnected(3000);
+ if (!isGlobal && !ClientCnxn.disableAutoWatchReset) {
+ localWatcher.waitForConnected(500);
+ }
+
+ zk.delete("/watchtest/child", -1);
+ zk.delete("/watchtest", -1);
+
+ e = localWatcher.events.poll(1, TimeUnit.MILLISECONDS);
+ if (!ClientCnxn.disableAutoWatchReset) {
+ assertEquals(EventType.NodeDeleted, e.getType());
+ assertEquals("/watchtest/child", e.getPath());
+ } else {
+ assertNull("unexpected event", e);
+ }
+
+ // Make sure nothing is straggling!
+ Thread.sleep(1000);
+ assertTrue(localWatcher.events.isEmpty());
+
+ }
+
}
Modified: hadoop/zookeeper/trunk/src/zookeeper.jute
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/zookeeper.jute?rev=706815&r1=706814&r2=706815&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/zookeeper.jute (original)
+++ hadoop/zookeeper/trunk/src/zookeeper.jute Tue Oct 21 16:49:45 2008
@@ -37,6 +37,7 @@
long ephemeralOwner; // owner id if ephemeral, 0 otw
int dataLength; //length of the data in the node
int numChildren; //number of children of this node
+ long pzxid; // last modified children
}
// information explicitly stored by the server persistently
class StatPersisted {
@@ -48,6 +49,7 @@
int cversion; // child version
int aversion; // acl version
long ephemeralOwner; // owner id if ephemeral, 0 otw
+ long pzxid; // last modified children
}
// information explicitly stored by the version 1 database of servers
@@ -83,6 +85,12 @@
long sessionId;
buffer passwd;
}
+ class SetWatches {
+ long relativeZxid;
+ vector<ustring>dataWatches;
+ vector<ustring>existWatches;
+ vector<ustring>childWatches;
+ }
class RequestHeader {
int xid;
int type;