You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2008/11/19 11:20:36 UTC
svn commit: r718925 - in /hadoop/zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/
src/java/test/org/apache/zookeeper/test/
Author: mahadev
Date: Wed Nov 19 02:20:35 2008
New Revision: 718925
URL: http://svn.apache.org/viewvc?rev=718925&view=rev
Log:
ZOOKEEPER-204. SetWatches needs to be the first message after auth messages to the server (ben via mahadev)
Added:
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=718925&r1=718924&r2=718925&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Nov 19 02:20:35 2008
@@ -30,6 +30,9 @@
ZOOKEEPER-226. fix exists calls that fail on server if node has null data.
(mahadev)
+ ZOOKEEPER-204. SetWatches needs to be the first message after auth messages
+to the server (ben via mahadev)
+
Release 3.0.0 - 2008-10-21
Non-backward compatible changes:
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=718925&r1=718924&r2=718925&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Nov 19 02:20:35 2008
@@ -336,87 +336,96 @@
try {
while (true) {
Object event = waitingEvents.take();
- if (event == eventOfDeath) {
- break;
- }
-
- if (event instanceof WatcherSetEventPair) {
- // each watcher will process the event
- WatcherSetEventPair pair = (WatcherSetEventPair)event;
- for (Watcher watcher: pair.watchers) {
- watcher.process(pair.event);
- }
- } else {
- Packet p = (Packet) event;
- int rc = 0;
- String path = p.path;
- if (p.replyHeader.getErr() != 0) {
- rc = p.replyHeader.getErr();
+ try {
+ if (event == eventOfDeath) {
+ break;
}
- if (p.cb == null) {
- LOG.warn("Somehow a null cb got to EventThread!");
- } else if (p.response instanceof ExistsResponse
- || p.response instanceof SetDataResponse
- || p.response instanceof SetACLResponse) {
- StatCallback cb = (StatCallback) p.cb;
- if (rc == 0) {
- if (p.response instanceof ExistsResponse) {
- cb.processResult(rc, path, p.ctx,
- ((ExistsResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetDataResponse) {
- cb.processResult(rc, path, p.ctx,
- ((SetDataResponse) p.response)
- .getStat());
- } else if (p.response instanceof SetACLResponse) {
- cb.processResult(rc, path, p.ctx,
- ((SetACLResponse) p.response)
- .getStat());
+
+ if (event instanceof WatcherSetEventPair) {
+ // each watcher will process the event
+ WatcherSetEventPair pair = (WatcherSetEventPair) event;
+ for (Watcher watcher : pair.watchers) {
+ try {
+ watcher.process(pair.event);
+ } catch (Throwable t) {
+ LOG.error("Error while calling watcher ", t);
}
- } else {
- cb.processResult(rc, path, p.ctx, null);
- }
- } else if (p.response instanceof GetDataResponse) {
- DataCallback cb = (DataCallback) p.cb;
- GetDataResponse rsp = (GetDataResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, path, p.ctx,
- rsp.getData(), rsp.getStat());
- } else {
- cb.processResult(rc, path, p.ctx, null, null);
- }
- } else if (p.response instanceof GetACLResponse) {
- ACLCallback cb = (ACLCallback) p.cb;
- GetACLResponse rsp = (GetACLResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, path, p.ctx, rsp.getAcl(),
- rsp.getStat());
- } else {
- cb.processResult(rc, path, p.ctx, null, null);
}
- } else if (p.response instanceof GetChildrenResponse) {
- ChildrenCallback cb = (ChildrenCallback) p.cb;
- GetChildrenResponse rsp = (GetChildrenResponse) p.response;
- if (rc == 0) {
- cb.processResult(rc, path, p.ctx, rsp
- .getChildren());
- } else {
- cb.processResult(rc, path, p.ctx, null);
+ } else {
+ Packet p = (Packet) event;
+ int rc = 0;
+ String path = p.path;
+ if (p.replyHeader.getErr() != 0) {
+ rc = p.replyHeader.getErr();
}
- } else if (p.response instanceof CreateResponse) {
- StringCallback cb = (StringCallback) p.cb;
- CreateResponse rsp = (CreateResponse) p.response;
- if (rc == 0) {
- cb
- .processResult(rc, path, p.ctx, rsp
- .getPath());
- } else {
- cb.processResult(rc, path, p.ctx, null);
+ if (p.cb == null) {
+ LOG.warn("Somehow a null cb got to EventThread!");
+ } else if (p.response instanceof ExistsResponse
+ || p.response instanceof SetDataResponse
+ || p.response instanceof SetACLResponse) {
+ StatCallback cb = (StatCallback) p.cb;
+ if (rc == 0) {
+ if (p.response instanceof ExistsResponse) {
+ cb.processResult(rc, path, p.ctx,
+ ((ExistsResponse) p.response)
+ .getStat());
+ } else if (p.response instanceof SetDataResponse) {
+ cb.processResult(rc, path, p.ctx,
+ ((SetDataResponse) p.response)
+ .getStat());
+ } else if (p.response instanceof SetACLResponse) {
+ cb.processResult(rc, path, p.ctx,
+ ((SetACLResponse) p.response)
+ .getStat());
+ }
+ } else {
+ cb.processResult(rc, path, p.ctx, null);
+ }
+ } else if (p.response instanceof GetDataResponse) {
+ DataCallback cb = (DataCallback) p.cb;
+ GetDataResponse rsp = (GetDataResponse) p.response;
+ if (rc == 0) {
+ cb.processResult(rc, path, p.ctx, rsp
+ .getData(), rsp.getStat());
+ } else {
+ cb.processResult(rc, path, p.ctx, null,
+ null);
+ }
+ } else if (p.response instanceof GetACLResponse) {
+ ACLCallback cb = (ACLCallback) p.cb;
+ GetACLResponse rsp = (GetACLResponse) p.response;
+ if (rc == 0) {
+ cb.processResult(rc, path, p.ctx, rsp
+ .getAcl(), rsp.getStat());
+ } else {
+ cb.processResult(rc, path, p.ctx, null,
+ null);
+ }
+ } else if (p.response instanceof GetChildrenResponse) {
+ ChildrenCallback cb = (ChildrenCallback) p.cb;
+ GetChildrenResponse rsp = (GetChildrenResponse) p.response;
+ if (rc == 0) {
+ cb.processResult(rc, path, p.ctx, rsp
+ .getChildren());
+ } else {
+ cb.processResult(rc, path, p.ctx, null);
+ }
+ } else if (p.response instanceof CreateResponse) {
+ StringCallback cb = (StringCallback) p.cb;
+ CreateResponse rsp = (CreateResponse) p.response;
+ if (rc == 0) {
+ cb.processResult(rc, path, p.ctx, rsp
+ .getPath());
+ } else {
+ cb.processResult(rc, path, p.ctx, null);
+ }
+ } else if (p.cb instanceof VoidCallback) {
+ VoidCallback cb = (VoidCallback) p.cb;
+ cb.processResult(rc, path, p.ctx);
}
- } else if (p.cb instanceof VoidCallback) {
- VoidCallback cb = (VoidCallback) p.cb;
- cb.processResult(rc, path, p.ctx);
}
+ } catch (Throwable t) {
+ LOG.error("Caught unexpected throwable", t);
}
}
} catch (InterruptedException e) {
@@ -504,15 +513,6 @@
sessionPasswd = conRsp.getPasswd();
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null));
- if (!disableAutoWatchReset) {
- SetWatches sw = new SetWatches(lastZxid,
- zooKeeper.getDataWatches(),
- zooKeeper.getExistWatches(),
- zooKeeper.getChildWatches());
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.setWatches);
- queuePacket(h, new ReplyHeader(), sw, null, null, null, null, null);
- }
}
void readResponse() throws IOException {
@@ -702,6 +702,20 @@
bb.putInt(bb.capacity() - 4);
bb.rewind();
synchronized (outgoingQueue) {
+ // We add backwards since we are pushing into the front
+ if (!disableAutoWatchReset) {
+ SetWatches sw = new SetWatches(lastZxid,
+ zooKeeper.getDataWatches(),
+ zooKeeper.getExistWatches(),
+ zooKeeper.getChildWatches());
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.setWatches);
+ h.setXid(-8);
+ Packet packet = new Packet(h, new ReplyHeader(), sw, null, null,
+ null);
+ outgoingQueue.addFirst(packet);
+ }
+
for (AuthData id : authInfo) {
outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
OpCode.auth), null, new AuthPacket(0, id.scheme,
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=718925&r1=718924&r2=718925&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Nov 19 02:20:35 2008
@@ -192,8 +192,11 @@
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
- addTo(existWatches.remove(path), result);
- LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
+ Set<Watcher> list = existWatches.remove(path);
+ if (list != null) {
+ addTo(existWatches.remove(path), result);
+ LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
+ }
}
synchronized (childWatches) {
addTo(childWatches.remove(path), result);
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java?rev=718925&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/TestableZooKeeper.java Wed Nov 19 02:20:35 2008
@@ -0,0 +1,54 @@
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.nio.channels.SocketChannel;
+import java.util.List;
+
+public class TestableZooKeeper extends ZooKeeper {
+
+ public TestableZooKeeper(String host, int sessionTimeout,
+ Watcher watcher) throws IOException {
+ super(host, sessionTimeout, watcher);
+ }
+
+ @Override
+ public List<String> getChildWatches() {
+ return super.getChildWatches();
+ }
+
+
+ @Override
+ public List<String> getDataWatches() {
+ return super.getDataWatches();
+ }
+
+
+ @Override
+ public List<String> getExistWatches() {
+ return super.getExistWatches();
+ }
+
+
+ /**
+ * Cause this ZooKeeper object to stop receiving from the ZooKeeperServer
+ * for the given number of milliseconds.
+ * @param ms the number of milliseconds to pause.
+ */
+ public void pauseCnxn(final long ms) {
+ new Thread() {
+ public void run() {
+ synchronized(cnxn) {
+ try {
+ try {
+ ((SocketChannel)cnxn.sendThread.sockKey.channel()).socket().close();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }.start();
+ }
+}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=718925&r1=718924&r2=718925&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Nov 19 02:20:35 2008
@@ -33,6 +33,7 @@
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
@@ -126,10 +127,10 @@
return createClient(watcher, hp);
}
- protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
+ protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
throws IOException, InterruptedException
{
- ZooKeeper zk = new ZooKeeper(hp, 9000, watcher);
+ TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher);
if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
TimeUnit.MILLISECONDS))
{
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java?rev=718925&r1=718924&r2=718925&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/WatcherTest.java Wed Nov 19 02:20:35 2008
@@ -27,7 +27,10 @@
import org.apache.zookeeper.ClientCnxn;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -40,6 +43,14 @@
public class WatcherTest extends ClientBase {
protected static final Logger LOG = Logger.getLogger(WatcherTest.class);
+ private final class MyStatCallback implements StatCallback {
+ int rc;
+ public void processResult(int rc, String path, Object ctx, Stat stat) {
+ ((int[])ctx)[0]++;
+ this.rc = rc;
+ }
+ }
+
private class MyWatcher extends CountdownWatcher {
LinkedBlockingQueue<WatchedEvent> events =
new LinkedBlockingQueue<WatchedEvent>();
@@ -118,6 +129,57 @@
}
}
+ final static int COUNT = 100;
+ boolean hasSeenDelete = true;
+ /**
+ * This test checks that watches for pending requests do not get triggered,
+ * but watches set by previous requests do.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testWatchAutoResetWithPending() throws Exception {
+ MyWatcher watches[] = new MyWatcher[COUNT];
+ MyStatCallback cbs[] = new MyStatCallback[COUNT];
+ MyWatcher watcher = new MyWatcher();
+ int count[] = new int[1];
+ TestableZooKeeper zk = createClient(watcher, hostPort);
+ ZooKeeper zk2 = createClient(watcher, hostPort);
+ zk2.create("/test", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ for(int i = 0; i < COUNT/2; i++) {
+ watches[i] = new MyWatcher();
+ cbs[i] = new MyStatCallback();
+ zk.exists("/test", watches[i], cbs[i], count);
+ }
+ zk.exists("/test", false);
+ zk.pauseCnxn(4000);
+ Thread.sleep(50);
+ zk2.close();
+ stopServer();
+ watches[0].waitForDisconnected(3000);
+ for(int i = COUNT/2; i < COUNT; i++) {
+ watches[i] = new MyWatcher();
+ cbs[i] = new MyStatCallback();
+ zk.exists("/test", watches[i], cbs[i], count);
+ }
+ startServer();
+ watches[49].waitForConnected(4000);
+ assertEquals(null, zk.exists("/test", false));
+ Thread.sleep(10);
+ for(int i = 0; i < COUNT/2; i++) {
+ assertEquals("For " + i, 1, watches[i].events.size());
+ }
+ for(int i = COUNT/2; i < COUNT; i++) {
+ if (cbs[i].rc == 0) {
+ assertEquals("For " +i, 1, watches[i].events.size());
+ } else {
+ assertEquals("For " +i, 0, watches[i].events.size());
+ }
+ }
+ assertEquals(COUNT, count[0]);
+ zk.close();
+ }
+
@Test
public void testWatcherAutoResetWithGlobal() throws Exception {
ZooKeeper zk = null;