You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2008/08/28 01:36:41 UTC
svn commit: r689668 - in /hadoop/zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/ src/java/test/org/apache/zookeeper/test/
Author: breed
Date: Wed Aug 27 16:36:40 2008
New Revision: 689668
URL: http://svn.apache.org/viewvc?rev=689668&view=rev
Log:
ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed)
Modified:
hadoop/zookeeper/trunk/CHANGES.txt
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java
Modified: hadoop/zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/CHANGES.txt?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/CHANGES.txt (original)
+++ hadoop/zookeeper/trunk/CHANGES.txt Wed Aug 27 16:36:40 2008
@@ -47,3 +47,5 @@
ZOOKEEPER-125. Remove unwanted class declaration in FastLeaderElection.
(Flavio Paiva Junqueira via mahadev)
+
+ ZOOKEEPER-63. Race condition in client close() operation. (phunt via breed)
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Wed Aug 27 16:36:40 2008
@@ -34,11 +34,10 @@
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import org.apache.log4j.Logger;
-
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback.ACLCallback;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.DataCallback;
@@ -63,7 +62,6 @@
import org.apache.zookeeper.proto.SetDataResponse;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.ByteBufferInputStream;
-import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
/**
@@ -75,7 +73,8 @@
class ClientCnxn {
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
- private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
+ private ArrayList<InetSocketAddress> serverAddrs =
+ new ArrayList<InetSocketAddress>();
static class AuthData {
AuthData(String scheme, byte data[]) {
@@ -122,6 +121,12 @@
final EventThread eventThread;
final Selector selector = Selector.open();
+
+ /** Set to true when close is called. Latches the connection such that
+ * we don't attempt to re-connect to the server if in the middle of
+ * closing the connection (client sends session disconnect to server
+ * as part of close operation) */
+ volatile boolean closing = false;
public long getSessionId() {
return sessionId;
@@ -253,7 +258,7 @@
class EventThread extends Thread {
EventThread() {
- super("EventThread");
+ super(currentThread().getName() + "-EventThread");
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
}
@@ -341,7 +346,10 @@
}
}
} catch (InterruptedException e) {
+ LOG.warn("Event thread exiting due to interruption", e);
}
+
+ LOG.info("EventThread shut down");
}
}
@@ -566,7 +574,7 @@
}
SendThread() {
- super("SendThread");
+ super(currentThread().getName() + "-SendThread");
zooKeeper.state = States.CONNECTING;
setUncaughtExceptionHandler(uncaughtExceptionHandler);
setDaemon(true);
@@ -666,6 +674,10 @@
while (zooKeeper.state.isAlive()) {
try {
if (sockKey == null) {
+ // don't re-establish connection if we are closing
+ if (closing) {
+ break;
+ }
startConnect();
lastSend = now;
lastHeard = now;
@@ -730,21 +742,34 @@
}
selected.clear();
} catch (Exception e) {
- LOG.warn("Closing session 0x"
- + Long.toHexString(getSessionId()),
- e);
- cleanup();
- if (zooKeeper.state.isAlive()) {
- waitingEvents.add(new WatcherEvent(Event.EventNone,
- Event.KeeperStateDisconnected, null));
+ if (closing) {
+ // closing so this is expected
+ LOG.info("Exception while closing send thread for session 0x"
+ + Long.toHexString(getSessionId())
+ + " : " + e.getMessage());
+ break;
+ } else {
+ LOG.warn("Exception closing session 0x"
+ + Long.toHexString(getSessionId()),
+ e);
+ cleanup();
+ if (zooKeeper.state.isAlive()) {
+ waitingEvents.add(new WatcherEvent(Event.EventNone,
+ Event.KeeperStateDisconnected, null));
+ }
+
+ now = System.currentTimeMillis();
+ lastHeard = now;
+ lastSend = now;
}
-
- now = System.currentTimeMillis();
- lastHeard = now;
- lastSend = now;
}
}
cleanup();
+ try {
+ selector.close();
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during selector close", e);
+ }
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"SendThread exitedloop.");
}
@@ -755,25 +780,29 @@
sockKey.cancel();
try {
sock.socket().shutdownInput();
- } catch (IOException e2) {
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during shutdown input", e);
}
try {
sock.socket().shutdownOutput();
- } catch (IOException e2) {
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during shutdown output", e);
}
try {
sock.socket().close();
- } catch (IOException e1) {
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during socket close", e);
}
try {
sock.close();
- } catch (IOException e1) {
+ } catch (IOException e) {
+ LOG.warn("Ignoring exception during channel close", e);
}
}
try {
Thread.sleep(100);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
+ } catch (InterruptedException e) {
+ LOG.warn("SendThread interrupted during sleep, ignoring");
}
sockKey = null;
synchronized (pendingQueue) {
@@ -798,15 +827,44 @@
}
}
- @SuppressWarnings("unchecked")
- public void close() throws IOException {
- LOG.info("Closing ClientCnxn for session: 0x"
+ /**
+ * Shutdown the send/event threads. This method should not be called
+ * directly - rather it should be called as part of close operation. This
+ * method is primarily here to allow the tests to verify disconnection
+ * behavior.
+ */
+ public void disconnect() {
+ LOG.info("Disconnecting ClientCnxn for session: 0x"
+ Long.toHexString(getSessionId()));
sendThread.close();
waitingEvents.add(eventOfDeath);
}
+ /**
+ * Close the connection, which includes; send session disconnect to
+ * the server, shutdown the send/event threads.
+ *
+ * @throws IOException
+ */
+ public void close() throws IOException {
+ LOG.info("Closing ClientCnxn for session: 0x"
+ + Long.toHexString(getSessionId()));
+
+ closing = true;
+
+ try {
+ RequestHeader h = new RequestHeader();
+ h.setType(ZooDefs.OpCode.closeSession);
+
+ submitRequest(h, null, null, null);
+ } catch (InterruptedException e) {
+ // ignore, close the send/event threads
+ } finally {
+ disconnect();
+ }
+ }
+
private int xid = 1;
synchronized private int getXid() {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Wed Aug 27 16:36:40 2008
@@ -312,9 +312,6 @@
public synchronized void close() throws InterruptedException {
LOG.info("Closing session: 0x" + Long.toHexString(getSessionId()));
- RequestHeader h = new RequestHeader();
- h.setType(ZooDefs.OpCode.closeSession);
- cnxn.submitRequest(h, null, null, null);
try {
cnxn.close();
} catch (IOException e) {
@@ -940,7 +937,10 @@
// Everything below this line is for testing!
+ /** Testing only!!! Really this needs to be moved into a stub in the
+ * tests - pending JIRA for that.
+ */
public void disconnect() throws IOException {
- cnxn.close();
+ cnxn.disconnect();
}
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncTest.java Wed Aug 27 16:36:40 2008
@@ -45,12 +45,11 @@
import org.junit.Test;
public class AsyncTest extends TestCase
- implements Watcher, StringCallback, VoidCallback, DataCallback
+ implements StringCallback, VoidCallback, DataCallback
{
private static final Logger LOG = Logger.getLogger(AsyncTest.class);
private QuorumTest quorumTest = new QuorumTest();
- private CountDownLatch clientConnected;
private volatile boolean bang;
@@ -72,11 +71,20 @@
@Override
protected void tearDown() throws Exception {
LOG.info("Test clients shutting down");
- clientConnected = null;
quorumTest.tearDown();
LOG.info("FINISHED " + getName());
}
+ private static class CountdownWatcher implements Watcher {
+ volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+ public void process(WatcherEvent event) {
+ if (event.getState() == Event.KeeperStateSyncConnected) {
+ clientConnected.countDown();
+ }
+ }
+ }
+
private ZooKeeper createClient() throws IOException,InterruptedException {
return createClient(quorumTest.hostPort);
}
@@ -84,9 +92,11 @@
private ZooKeeper createClient(String hp)
throws IOException, InterruptedException
{
- clientConnected = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(hp, 30000, this);
- if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(hp, 30000, watcher);
+ if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ {
fail("Unable to connect to server");
}
return zk;
@@ -239,12 +249,6 @@
}
}
- public void process(WatcherEvent event) {
- if(event.getState()==Event.KeeperStateSyncConnected){
- clientConnected.countDown();
- }
- }
-
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx, String name) {
synchronized(ctx) {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Aug 27 16:36:40 2008
@@ -25,10 +25,16 @@
import java.io.OutputStream;
import java.net.Socket;
import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.ServerStats;
import org.apache.zookeeper.server.SyncRequestProcessor;
@@ -53,6 +59,51 @@
super(name);
}
+ /**
+ * In general don't use this. Only use in the special case that you
+ * want to ignore results (for whatever reason) in your test. Don't
+ * use empty watchers in real code!
+ *
+ */
+ protected class NullWatcher implements Watcher {
+ public void process(WatcherEvent event) { /* nada */ }
+ }
+
+ protected static class CountdownWatcher implements Watcher {
+ volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+ public void process(WatcherEvent event) {
+ if (event.getState() == Event.KeeperStateSyncConnected) {
+ clientConnected.countDown();
+ }
+ }
+ }
+
+ protected ZooKeeper createClient()
+ throws IOException, InterruptedException
+ {
+ return createClient(hostPort);
+ }
+
+ protected ZooKeeper createClient(String hp)
+ throws IOException, InterruptedException
+ {
+ CountdownWatcher watcher = new CountdownWatcher();
+ return createClient(watcher, hp);
+ }
+
+ protected ZooKeeper createClient(CountdownWatcher watcher, String hp)
+ throws IOException, InterruptedException
+ {
+ ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
+ if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ {
+ fail("Unable to connect to server");
+ }
+ return zk;
+ }
+
public static boolean waitForServerUp(String hp, long timeout) {
long start = System.currentTimeMillis();
String split[] = hp.split(":");
@@ -242,4 +293,44 @@
return d.delete();
}
+ /*
+ * Verify that all of the servers see the same number of nodes
+ * at the root
+ */
+ void verifyRootOfAllServersMatch(String hostPort)
+ throws InterruptedException, KeeperException, IOException
+ {
+ String parts[] = hostPort.split(",");
+
+ // run through till the counts no longer change on each server
+ // max 15 tries, with 2 second sleeps, so approx 30 seconds
+ int[] counts = new int[parts.length];
+ for (int j = 0; j < 100; j++) {
+ int newcounts[] = new int[parts.length];
+ int i = 0;
+ for (String hp : parts) {
+ ZooKeeper zk = createClient(hp);
+ try {
+ newcounts[i++] = zk.getChildren("/", false).size();
+ } finally {
+ zk.close();
+ }
+ }
+
+ if (Arrays.equals(newcounts, counts)) {
+ LOG.info("Found match with array:"
+ + Arrays.toString(newcounts));
+ counts = newcounts;
+ break;
+ } else {
+ counts = newcounts;
+ Thread.sleep(10000);
+ }
+ }
+
+ // verify all the servers reporting same number of nodes
+ for (int i = 1; i < parts.length; i++) {
+ assertEquals("node count not consistent", counts[i-1], counts[i]);
+ }
+ }
}
\ No newline at end of file
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientTest.java Wed Aug 27 16:36:40 2008
@@ -20,20 +20,18 @@
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.Watcher.Event;
import org.apache.zookeeper.ZooDefs.CreateFlags;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooDefs.Perms;
@@ -43,33 +41,14 @@
import org.apache.zookeeper.proto.WatcherEvent;
import org.junit.Test;
-public class ClientTest extends ClientBase implements Watcher {
+public class ClientTest extends ClientBase {
protected static final Logger LOG = Logger.getLogger(ClientTest.class);
LinkedBlockingQueue<WatcherEvent> events =
new LinkedBlockingQueue<WatcherEvent>();
- protected volatile CountDownLatch clientConnected;
-
- protected ZooKeeper createClient(Watcher watcher)
- throws IOException, InterruptedException
- {
- return createClient(watcher, hostPort);
- }
-
- protected ZooKeeper createClient(Watcher watcher, String hp)
- throws IOException, InterruptedException
- {
- clientConnected = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(hp, 20000, watcher);
- if (!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
- fail("Unable to connect to server");
- }
- return zk;
- }
@Override
protected void tearDown() throws Exception {
- clientConnected = null;
super.tearDown();
LOG.info("FINISHED " + getName());
}
@@ -79,8 +58,8 @@
ZooKeeper zkIdle = null;
ZooKeeper zkWatchCreator = null;
try {
- zkIdle = createClient(this);
- zkWatchCreator = createClient(this);
+ zkIdle = createClient();
+ zkWatchCreator = createClient();
for (int i = 0; i < 30; i++) {
zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
}
@@ -121,12 +100,13 @@
public void testACLs() throws Exception {
ZooKeeper zk = null;
try {
- zk = createClient(this);
+ zk = createClient();
try {
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
fail("Should have received an invalid acl error");
} catch(InvalidACLException e) {
- LOG.error("Invalid acl", e);
+ LOG.info("Test successful, invalid acl received : "
+ + e.getMessage());
}
try {
ArrayList<ACL> testACL = new ArrayList<ACL>();
@@ -135,12 +115,13 @@
zk.create("/acltest", new byte[0], testACL, 0);
fail("Should have received an invalid acl error");
} catch(InvalidACLException e) {
- LOG.error("Invalid acl", e);
+ LOG.info("Test successful, invalid acl received : "
+ + e.getMessage());
}
zk.addAuthInfo("digest", "ben:passwd".getBytes());
zk.create("/acltest", new byte[0], Ids.CREATOR_ALL_ACL, 0);
zk.close();
- zk = createClient(this);
+ zk = createClient();
zk.addAuthInfo("digest", "ben:passwd2".getBytes());
try {
zk.getData("/acltest", false, new Stat());
@@ -152,7 +133,7 @@
zk.getData("/acltest", false, new Stat());
zk.setACL("/acltest", Ids.OPEN_ACL_UNSAFE, -1);
zk.close();
- zk = createClient(this);
+ zk = createClient();
zk.getData("/acltest", false, new Stat());
List<ACL> acls = zk.getACL("/acltest", new Stat());
assertEquals(1, acls.size());
@@ -165,11 +146,25 @@
}
}
- private void performClientTest(boolean withWatcherObj) throws IOException,
- InterruptedException, KeeperException {
+ protected class MyWatcher extends CountdownWatcher {
+ public void process(WatcherEvent event) {
+ super.process(event);
+ if (event.getType() != Event.EventNone) {
+ try {
+ events.put(event);
+ } catch (InterruptedException e) {
+ LOG.warn("ignoring interrupt during event.put");
+ }
+ }
+ }
+ }
+
+ private void performClientTest(boolean withWatcherObj)
+ throws IOException, InterruptedException, KeeperException
+ {
ZooKeeper zk = null;
try {
- zk =createClient(this);
+ zk = createClient(new MyWatcher(), hostPort);
//LOG.info("Created client: " + zk.describeCNXN());
LOG.info("Before create /benwashere");
zk.create("/benwashere", "".getBytes(), Ids.OPEN_ACL_UNSAFE, 0);
@@ -188,7 +183,7 @@
zk.close();
//LOG.info("Closed client: " + zk.describeCNXN());
Thread.sleep(2000);
- zk = createClient(this);
+ zk = createClient(new MyWatcher(), hostPort);
//LOG.info("Created a new client: " + zk.describeCNXN());
LOG.info("Before delete /");
@@ -213,7 +208,7 @@
if (withWatcherObj) {
assertEquals(null, zk.exists("/frog", new MyWatcher()));
} else {
- assertEquals(null, zk.exists("/frog", true));
+ assertEquals(null, zk.exists("/frog", true));
}
LOG.info("Comment: asseting passed for frog setting /");
} catch (KeeperException.NoNodeException e) {
@@ -292,14 +287,17 @@
// Test that sequential filenames are being created correctly,
// with 0-padding in the filename
- public void testSequentialNodeNames() throws IOException, InterruptedException, KeeperException {
+ @Test
+ public void testSequentialNodeNames()
+ throws IOException, InterruptedException, KeeperException
+ {
String path = "/SEQUENCE";
- String file = "TEST";
- String filepath = path + "/" + file;
+ String file = "TEST";
+ String filepath = path + "/" + file;
ZooKeeper zk = null;
try {
- zk =createClient(this);
+ zk = createClient();
zk.create(path, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
zk.create(filepath, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateFlags.SEQUENCE);
List<String> children = zk.getChildren(path, false);
@@ -351,7 +349,7 @@
@Test
public void testDeleteWithChildren() throws Exception {
- ZooKeeper zk = createClient(this);
+ ZooKeeper zk = createClient();
zk.create("/parent", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
zk.create("/parent/child", new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
try {
@@ -364,20 +362,27 @@
zk.delete("/parent", -1);
zk.close();
}
-
- private static class HammerThread extends Thread {
- private static final long LATENCY = 5;
+
+ private static final long HAMMERTHREAD_LATENCY = 5;
+
+ private static abstract class HammerThread extends Thread {
+ protected final int count;
+ protected volatile int current = 0;
+ HammerThread(String name, int count) {
+ super(name);
+ this.count = count;
+ }
+ }
+
+ private static class BasicHammerThread extends HammerThread {
private final ZooKeeper zk;
private final String prefix;
- private final int count;
- private volatile int current = 0;
- HammerThread(String name, ZooKeeper zk, String prefix, int count) {
- super(name);
+ BasicHammerThread(String name, ZooKeeper zk, String prefix, int count) {
+ super(name, count);
this.zk = zk;
this.prefix = prefix;
- this.count = count;
}
public void run() {
@@ -385,87 +390,129 @@
try {
for (; current < count; current++) {
// Simulate a bit of network latency...
- Thread.sleep(LATENCY);
+ Thread.sleep(HAMMERTHREAD_LATENCY);
zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
}
- } catch (Exception e) {
- LOG.error("Client create operation failed", e);
+ } catch (Throwable t) {
+ LOG.error("Client create operation failed", t);
} finally {
- if (zk != null) {
- try {
- zk.close();
- } catch (InterruptedException e) {
- LOG.warn("Unexpected", e);
- }
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected", e);
}
}
}
}
- /*
- * Verify that all of the servers see the same number of nodes
- * at the root
- */
- void verifyRootOfAllServersMatch(String hostPort)
- throws InterruptedException, KeeperException, IOException
- {
- String parts[] = hostPort.split(",");
+ private static class SuperHammerThread extends HammerThread {
+ private final ClientTest parent;
+ private final String prefix;
- // run through till the counts no longer change on each server
- // max 15 tries, with 2 second sleeps, so approx 30 seconds
- int[] counts = new int[parts.length];
- for (int j = 0; j < 100; j++) {
- int newcounts[] = new int[parts.length];
- int i = 0;
- for (String hp : parts) {
- ZooKeeper zk = createClient(this, hp);
- try {
- newcounts[i++] = zk.getChildren("/", false).size();
- } finally {
- zk.close();
- }
- }
+ SuperHammerThread(String name, ClientTest parent, String prefix,
+ int count)
+ {
+ super(name, count);
+ this.parent = parent;
+ this.prefix = prefix;
+ }
- if (Arrays.equals(newcounts, counts)) {
- LOG.info("Found match with array:"
- + Arrays.toString(newcounts));
- counts = newcounts;
- break;
- } else {
- counts = newcounts;
- Thread.sleep(10000);
+ public void run() {
+ byte b[] = new byte[256];
+ try {
+ for (; current < count; current++) {
+ ZooKeeper zk = parent.createClient();
+ try {
+ zk.create(prefix + current, b, Ids.OPEN_ACL_UNSAFE, 0);
+ } finally {
+ try {
+ zk.close();
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected", e);
+ }
+ }
+ }
+ } catch (Throwable t) {
+ LOG.error("Client create operation failed", t);
}
}
+ }
- // verify all the servers reporting same number of nodes
- for (int i = 1; i < parts.length; i++) {
- assertEquals("node count not consistent", counts[i-1], counts[i]);
+ /**
+ * Separate threads each creating a number of nodes. Each thread
+ * is using a non-shared (owned by thread) client for all node creations.
+ * @throws Throwable
+ */
+ @Test
+ public void testHammerBasic() throws Throwable {
+ try {
+ final int threadCount = 10;
+ final int childCount = 1000;
+
+ HammerThread[] threads = new HammerThread[threadCount];
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < threads.length; i++) {
+ ZooKeeper zk = createClient();
+ String prefix = "/test-" + i;
+ zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+ prefix += "/";
+ HammerThread thread =
+ new BasicHammerThread("BasicHammerThread-" + i, zk, prefix,
+ childCount);
+ thread.start();
+
+ threads[i] = thread;
+ }
+
+ verifyHammer(start, threads, childCount);
+ } catch (Throwable t) {
+ LOG.error("test failed", t);
+ throw t;
}
}
-
-
+
+ /**
+ * Separate threads each creating a number of nodes. Each thread
+ * is creating a new client for each node creation.
+ * @throws Throwable
+ */
@Test
- public void testHammer()
- throws IOException, InterruptedException, KeeperException
- {
- final int threadCount = 10;
- final int childCount = 1000;
-
- HammerThread[] threads = new HammerThread[threadCount];
- long start = System.currentTimeMillis();
- for (int i = 0; i < threads.length; i++) {
- Thread.sleep(10);
- ZooKeeper zk = createClient(this);
- String prefix = "/test-" + i;
- zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
- prefix += "/";
- HammerThread thread =
- new HammerThread("HammerThread-" + i, zk, prefix, childCount);
- thread.start();
+ public void testHammerSuper() throws Throwable {
+ try {
+ final int threadCount = 5;
+ final int childCount = 10;
- threads[i] = thread;
+ HammerThread[] threads = new HammerThread[threadCount];
+ long start = System.currentTimeMillis();
+ for (int i = 0; i < threads.length; i++) {
+ String prefix = "/test-" + i;
+ {
+ ZooKeeper zk = createClient();
+ try {
+ zk.create(prefix, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+ } finally {
+ zk.close();
+ }
+ }
+ prefix += "/";
+ HammerThread thread =
+ new SuperHammerThread("SuperHammerThread-" + i, this,
+ prefix, childCount);
+ thread.start();
+
+ threads[i] = thread;
+ }
+
+ verifyHammer(start, threads, childCount);
+ } catch (Throwable t) {
+ LOG.error("test failed", t);
+ throw t;
}
-
+ }
+
+ public void verifyHammer(long start, HammerThread[] threads, int childCount)
+ throws IOException, InterruptedException, KeeperException
+ {
// look for the clients to finish their create operations
LOG.info("Starting check for completed hammers");
int workingCount = threads.length;
@@ -493,44 +540,75 @@
for (HammerThread h : threads) {
final int safetyFactor = 3;
verifyThreadTerminated(h,
- threadCount * childCount
- * HammerThread.LATENCY * safetyFactor);
+ threads.length * childCount
+ * HAMMERTHREAD_LATENCY * safetyFactor);
}
LOG.info(new Date() + " Total time "
+ (System.currentTimeMillis() - start));
- ZooKeeper zk = createClient(this);
-
- LOG.info("******************* Connected to ZooKeeper" + new Date());
- for (int i = 0; i < threadCount; i++) {
- LOG.info("Doing thread: " + i + " " + new Date());
- List<String> children =
- zk.getChildren("/test-" + i, false);
- assertEquals(childCount, children.size());
- }
- for (int i = 0; i < threadCount; i++) {
- List<String> children =
- zk.getChildren("/test-" + i, false);
- assertEquals(childCount, children.size());
+ ZooKeeper zk = createClient();
+ try {
+
+ LOG.info("******************* Connected to ZooKeeper" + new Date());
+ for (int i = 0; i < threads.length; i++) {
+ LOG.info("Doing thread: " + i + " " + new Date());
+ List<String> children =
+ zk.getChildren("/test-" + i, false);
+ assertEquals(childCount, children.size());
+ }
+ for (int i = 0; i < threads.length; i++) {
+ List<String> children =
+ zk.getChildren("/test-" + i, false);
+ assertEquals(childCount, children.size());
+ }
+ } finally {
+ zk.close();
}
}
-
- public class MyWatcher implements Watcher {
- public void process(WatcherEvent event) {
- ClientTest.this.process(event);
+
+ private class VerifyClientCleanup extends Thread {
+ int count;
+ int current = 0;
+
+ VerifyClientCleanup(String name, int count) {
+ super(name);
+ this.count = count;
+ }
+
+ public void run() {
+ try {
+ for (; current < count; current++) {
+ ZooKeeper zk = createClient();
+ zk.close();
+ }
+ } catch (Throwable t) {
+ LOG.error("test failed", t);
+ }
}
}
- public void process(WatcherEvent event) {
- if (event.getState() == Event.KeeperStateSyncConnected) {
- clientConnected.countDown();
+ /**
+ * Verify that the client is cleaning up properly. Open/close a large
+ * number of sessions. Essentially looking to see if sockets/selectors
+ * are being cleaned up properly during close.
+ *
+ * @throws Throwable
+ */
+ @Test
+ public void testClientCleanup() throws Throwable {
+ final int threadCount = 20;
+ final int clientCount = 100;
+
+ VerifyClientCleanup threads[] = new VerifyClientCleanup[threadCount];
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i] = new VerifyClientCleanup("VCC" + i, clientCount);
+ threads[i].start();
}
- if (event.getType() != Event.EventNone) {
- try {
- events.put(event);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
+
+ for (int i = 0; i < threads.length; i++) {
+ threads[i].join(600000);
+ assertTrue(threads[i].current == threads[i].count);
}
}
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DataTreeTest.java Wed Aug 27 16:36:40 2008
@@ -43,7 +43,6 @@
LOG.info("FINISHED " + getName());
}
-
public void testRootWatchTriggered() throws Exception {
class MyWatcher implements Watcher{
boolean fired=false;
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Aug 27 16:36:40 2008
@@ -24,17 +24,22 @@
import java.util.ArrayList;
import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumStats;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.junit.After;
import org.junit.Before;
+import org.junit.Test;
-public class QuorumTest extends ClientTest {
+public class QuorumTest extends ClientBase {
private static final Logger LOG = Logger.getLogger(QuorumTest.class);
+ private ClientTest ct = new ClientTest();
+
File s1dir, s2dir, s3dir, s4dir, s5dir;
QuorumPeer s1, s2, s3, s4, s5;
+
@Before
@Override
protected void setUp() throws Exception {
@@ -43,6 +48,7 @@
setupTestEnv();
hostPort = "127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183,127.0.0.1:2184,127.0.0.1:2185";
+ ct.hostPort = hostPort;
s1dir = ClientBase.createTmpDir();
s2dir = ClientBase.createTmpDir();
@@ -126,4 +132,47 @@
LOG.debug("QP interrupted", e);
}
}
+
+ @Test
+ public void testDeleteWithChildren() throws Exception {
+ ct.testDeleteWithChildren();
+ }
+
+ @Test
+ public void testHammerBasic() throws Throwable {
+ ct.testHammerBasic();
+ }
+
+ @Test
+ public void testPing() throws Exception {
+ ct.testPing();
+ }
+
+ @Test
+ public void testSequentialNodeNames()
+ throws IOException, InterruptedException, KeeperException
+ {
+ ct.testSequentialNodeNames();
+ }
+
+ @Test
+ public void testACLs() throws Exception {
+ ct.testACLs();
+ }
+
+ @Test
+ public void testClientwithoutWatcherObj() throws IOException,
+ InterruptedException, KeeperException
+ {
+ ct.testClientwithoutWatcherObj();
+ }
+
+ @Test
+ public void testClientWithWatcherObj() throws IOException,
+ InterruptedException, KeeperException
+ {
+ ct.testClientWithWatcherObj();
+ }
+
+ // skip superhammer and clientcleanup as they are too expensive for quorum
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTest.java Wed Aug 27 16:36:40 2008
@@ -23,6 +23,7 @@
import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
@@ -44,9 +45,9 @@
private static final String HOSTPORT = "127.0.0.1:33299";
- private CountDownLatch startSignal;
-
private NIOServerCnxn.Factory serverFactory;
+
+ private CountDownLatch startSignal;
@Override
protected void setUp() throws Exception {
@@ -78,12 +79,26 @@
LOG.info("FINISHED " + getName());
}
+ private static class CountdownWatcher implements Watcher {
+ volatile CountDownLatch clientConnected = new CountDownLatch(1);
+
+ public void process(WatcherEvent event) {
+ if (event.getState() == Event.KeeperStateSyncConnected) {
+ clientConnected.countDown();
+ }
+ }
+ }
+
private ZooKeeper createClient()
throws IOException, InterruptedException
{
- startSignal = new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
- startSignal.await();
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, watcher);
+ if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS))
+ {
+ fail("Unable to connect to server");
+ }
return zk;
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java?rev=689668&r1=689667&r2=689668&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SyncCallTest.java Wed Aug 27 16:36:40 2008
@@ -25,20 +25,17 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.AsyncCallback.VoidCallback;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.proto.WatcherEvent;
import org.junit.Test;
public class SyncCallTest extends ClientBase
- implements Watcher, ChildrenCallback, StringCallback, VoidCallback
+ implements ChildrenCallback, StringCallback, VoidCallback
{
- private CountDownLatch clientConnected;
private CountDownLatch opsCount;
List<Integer> results = new LinkedList<Integer>();
@@ -59,7 +56,7 @@
for(int i = 0; i < 100; i++)
zk.delete("/test" + i, 0, this, results);
for(int i = 0; i < 100; i++)
- zk.getChildren("/", this, this, results);
+ zk.getChildren("/", new NullWatcher(), this, results);
LOG.info("Submitted all operations:" + (new Date()).toString());
@@ -74,22 +71,6 @@
System.out.println(e.toString());
}
}
-
- private ZooKeeper createClient() throws IOException,InterruptedException{
- clientConnected=new CountDownLatch(1);
- ZooKeeper zk = new ZooKeeper(hostPort, 30000, this);
- if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
- fail("Unable to connect to server");
- }
- return zk;
- }
-
- public void process(WatcherEvent event) {
- //LOG.info("Process: " + event.getType() + " " + event.getPath());
- if (event.getState() == Event.KeeperStateSyncConnected) {
- clientConnected.countDown();
- }
- }
@SuppressWarnings("unchecked")
public void processResult(int rc, String path, Object ctx,