You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ph...@apache.org on 2010/08/18 08:24:09 UTC
svn commit: r986575 [3/4] - in /hadoop/zookeeper/trunk: ./
src/docs/src/documentation/content/xdocs/ src/java/libtest/
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/auth/ src/ja...
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -18,8 +18,10 @@
package org.apache.zookeeper.server;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.InputStream;
import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -31,18 +33,27 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.Environment;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.proto.AuthPacket;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.ConnectResponse;
+import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.auth.AuthenticationProvider;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
@@ -62,16 +73,7 @@ public class ZooKeeperServer implements
protected ZooKeeperServerBean jmxServerBean;
protected DataTreeBean jmxDataTreeBean;
- /**
- * Create an instance of ZooKeeper server
- */
- static public interface Factory {
- public ZooKeeperServer createServer() throws IOException;
-
- public NIOServerCnxn.Factory createConnectionFactory()
- throws IOException;
- }
-
+
/**
* The server delegates loading of the tree to an instance of the interface
*/
@@ -112,7 +114,7 @@ public class ZooKeeperServer implements
final HashMap<String, ChangeRecord> outstandingChangesForPath =
new HashMap<String, ChangeRecord>();
- private NIOServerCnxn.Factory serverCnxnFactory;
+ private ServerCnxnFactory serverCnxnFactory;
private final ServerStats serverStats;
@@ -182,7 +184,7 @@ public class ZooKeeperServer implements
pwriter.print("tickTime=");
pwriter.println(getTickTime());
pwriter.print("maxClientCnxns=");
- pwriter.println(serverCnxnFactory.getMaxClientCnxns());
+ pwriter.println(serverCnxnFactory.getMaxClientCnxnsPerHost());
pwriter.print("minSessionTimeout=");
pwriter.println(getMinSessionTimeout());
pwriter.print("maxSessionTimeout=");
@@ -240,7 +242,7 @@ public class ZooKeeperServer implements
setZxid(zkDb.loadDataBase());
// Clean up dead sessions
LinkedList<Long> deadSessions = new LinkedList<Long>();
- for (long session : zkDb.getSessions()) {
+ for (Long session : zkDb.getSessions()) {
sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
if (sessionsWithTimeouts.get(session) == null) {
deadSessions.add(session);
@@ -400,6 +402,8 @@ public class ZooKeeperServer implements
}
public void shutdown() {
+ LOG.info("shutting down");
+
// new RuntimeException("Calling shutdown").printStackTrace();
this.running = false;
// Since sessionTracker and syncThreads poll we just have to
@@ -497,8 +501,7 @@ public class ZooKeeperServer implements
&& Arrays.equals(passwd, generatePasswd(sessionId));
}
- long createSession(ServerCnxn cnxn, byte passwd[], int timeout)
- throws InterruptedException {
+ long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
@@ -520,25 +523,71 @@ public class ZooKeeperServer implements
}
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
- int sessionTimeout) throws IOException, InterruptedException {
+ int sessionTimeout) throws IOException {
boolean rc = sessionTracker.touchSession(sessionId, sessionTimeout);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,ZooTrace.SESSION_TRACE_MASK,
"Session 0x" + Long.toHexString(sessionId) +
" is valid: " + rc);
}
- cnxn.finishSessionInit(rc);
+ finishSessionInit(cnxn, rc);
}
public void reopenSession(ServerCnxn cnxn, long sessionId, byte[] passwd,
- int sessionTimeout) throws IOException, InterruptedException {
+ int sessionTimeout) throws IOException {
if (!checkPasswd(sessionId, passwd)) {
- cnxn.finishSessionInit(false);
+ finishSessionInit(cnxn, false);
} else {
revalidateSession(cnxn, sessionId, sessionTimeout);
}
}
+ public void finishSessionInit(ServerCnxn cnxn, boolean valid) {
+ // register with JMX
+ try {
+ if (valid) {
+ serverCnxnFactory.registerConnection(cnxn);
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to register with JMX", e);
+ }
+
+ try {
+ ConnectResponse rsp = new ConnectResponse(0, valid ? cnxn.getSessionTimeout()
+ : 0, valid ? cnxn.getSessionId() : 0, // send 0 if session is no
+ // longer valid
+ valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
+ bos.writeInt(-1, "len");
+ rsp.serialize(bos, "connect");
+ baos.close();
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+ bb.putInt(bb.remaining() - 4).rewind();
+ cnxn.sendBuffer(bb);
+
+ if (!valid) {
+ LOG.info("Invalid session 0x"
+ + Long.toHexString(cnxn.getSessionId())
+ + " for client "
+ + cnxn.getRemoteSocketAddress()
+ + ", probably expired");
+ cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+ } else {
+ LOG.info("Established session 0x"
+ + Long.toHexString(cnxn.getSessionId())
+ + " with negotiated timeout " + cnxn.getSessionTimeout()
+ + " for client "
+ + cnxn.getRemoteSocketAddress());
+ }
+
+ cnxn.enableRecv();
+ } catch (Exception e) {
+ LOG.warn("Exception while establishing session, closing", e);
+ cnxn.close();
+ }
+ }
+
public void closeSession(ServerCnxn cnxn, RequestHeader requestHeader) {
closeSession(cnxn.getSessionId());
}
@@ -620,11 +669,11 @@ public class ZooKeeperServer implements
return limit;
}
- public void setServerCnxnFactory(NIOServerCnxn.Factory factory) {
+ public void setServerCnxnFactory(ServerCnxnFactory factory) {
serverCnxnFactory = factory;
}
- public NIOServerCnxn.Factory getServerCnxnFactory() {
+ public ServerCnxnFactory getServerCnxnFactory() {
return serverCnxnFactory;
}
@@ -684,7 +733,7 @@ public class ZooKeeperServer implements
}
public int getClientPort() {
- return serverCnxnFactory != null ? serverCnxnFactory.ss.socket().getLocalPort() : -1;
+ return serverCnxnFactory != null ? serverCnxnFactory.getLocalPort() : -1;
}
public void setTxnLogFactory(FileTxnSnapLog txnLog) {
@@ -703,4 +752,115 @@ public class ZooKeeperServer implements
zkDb.dumpEphemerals(pwriter);
}
+ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+ ConnectRequest connReq = new ConnectRequest();
+ connReq.deserialize(bia, "connect");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session establishment request from client "
+ + cnxn.getRemoteSocketAddress()
+ + " client's lastZxid is 0x"
+ + Long.toHexString(connReq.getLastZxidSeen()));
+ }
+ if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+ String msg = "Refusing session request for client "
+ + cnxn.getRemoteSocketAddress()
+ + " as it has seen zxid 0x"
+ + Long.toHexString(connReq.getLastZxidSeen())
+ + " our last zxid is 0x"
+ + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
+ + " client must try another server";
+
+ LOG.info(msg);
+ throw new CloseRequestException(msg);
+ }
+ int sessionTimeout = connReq.getTimeOut();
+ byte passwd[] = connReq.getPasswd();
+ int minSessionTimeout = getMinSessionTimeout();
+ if (sessionTimeout < minSessionTimeout) {
+ sessionTimeout = minSessionTimeout;
+ }
+ int maxSessionTimeout = getMaxSessionTimeout();
+ if (sessionTimeout > maxSessionTimeout) {
+ sessionTimeout = maxSessionTimeout;
+ }
+ cnxn.setSessionTimeout(sessionTimeout);
+ // We don't want to receive any packets until we are sure that the
+ // session is setup
+ cnxn.disableRecv();
+ long sessionId = connReq.getSessionId();
+ if (sessionId != 0) {
+ long clientSessionId = connReq.getSessionId();
+ LOG.info("Client attempting to renew session 0x"
+ + Long.toHexString(clientSessionId)
+ + " at " + cnxn.getRemoteSocketAddress());
+ serverCnxnFactory.closeSession(sessionId);
+ cnxn.setSessionId(sessionId);
+ reopenSession(cnxn, sessionId, passwd, sessionTimeout);
+ } else {
+ LOG.info("Client attempting to establish new session at "
+ + cnxn.getRemoteSocketAddress());
+ createSession(cnxn, passwd, sessionTimeout);
+ }
+ }
+
+ public boolean shouldThrottle(long outStandingCount) {
+ if (getGlobalOutstandingLimit() < getInProcess()) {
+ return outStandingCount > 0;
+ }
+ return false;
+ }
+
+ public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
+ // We have the request, now process and setup for next
+ InputStream bais = new ByteBufferInputStream(incomingBuffer);
+ BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
+ RequestHeader h = new RequestHeader();
+ h.deserialize(bia, "header");
+ // Through the magic of byte buffers, txn will not be
+ // pointing
+ // to the start of the txn
+ incomingBuffer = incomingBuffer.slice();
+ if (h.getType() == OpCode.auth) {
+ LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
+ AuthPacket authPacket = new AuthPacket();
+ ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
+ String scheme = authPacket.getScheme();
+ AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
+ if (ap == null
+ || (ap.handleAuthentication(cnxn, authPacket.getAuth())
+ != KeeperException.Code.OK)) {
+ if (ap == null) {
+ LOG.warn("No authentication provider for scheme: "
+ + scheme + " has "
+ + ProviderRegistry.listProviders());
+ } else {
+ LOG.warn("Authentication failed for scheme: " + scheme);
+ }
+ // send a response...
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
+ KeeperException.Code.AUTHFAILED.intValue());
+ cnxn.sendResponse(rh, null, null);
+ // ... and close connection
+ cnxn.sendBuffer(ServerCnxnFactory.closeConn);
+ cnxn.disableRecv();
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Authentication succeeded for scheme: "
+ + scheme);
+ }
+ LOG.info("auth success " + cnxn.getRemoteSocketAddress());
+ ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
+ KeeperException.Code.OK.intValue());
+ cnxn.sendResponse(rh, null, null);
+ }
+ return;
+ } else {
+ Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
+ h.getType(), incomingBuffer, cnxn.getAuthInfo());
+ si.setOwner(ServerCnxn.me);
+ submitRequest(si);
+ }
+ cnxn.incrOutstandingRequests(h);
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerBean.java Wed Aug 18 06:24:08 2010
@@ -90,16 +90,16 @@ public class ZooKeeperServerBean impleme
}
public int getMaxClientCnxnsPerHost() {
- NIOServerCnxn.Factory fac = zks.getServerCnxnFactory();
+ ServerCnxnFactory fac = zks.getServerCnxnFactory();
if (fac == null) {
return -1;
}
- return fac.getMaxClientCnxns();
+ return fac.getMaxClientCnxnsPerHost();
}
public void setMaxClientCnxnsPerHost(int max) {
// if fac is null the exception will be propagated to the client
- zks.getServerCnxnFactory().maxClientCnxns = max;
+ zks.getServerCnxnFactory().setMaxClientCnxnsPerHost(max);
}
public int getMinSessionTimeout() {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServerMain.java Wed Aug 18 06:24:08 2010
@@ -38,7 +38,7 @@ public class ZooKeeperServerMain {
private static final String USAGE =
"Usage: ZooKeeperServerMain configfile | port datadir [ticktime] [maxcnxns]";
- private NIOServerCnxn.Factory cnxnFactory;
+ private ServerCnxnFactory cnxnFactory;
/*
* Start up the ZooKeeper server.
@@ -105,7 +105,8 @@ public class ZooKeeperServerMain {
zkServer.setTickTime(config.tickTime);
zkServer.setMinSessionTimeout(config.minSessionTimeout);
zkServer.setMaxSessionTimeout(config.maxSessionTimeout);
- cnxnFactory = new NIOServerCnxn.Factory(config.getClientPortAddress(),
+ cnxnFactory = ServerCnxnFactory.createFactory();
+ cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
cnxnFactory.startup(zkServer);
cnxnFactory.join();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java Wed Aug 18 06:24:08 2010
@@ -102,9 +102,9 @@ public class DigestAuthenticationProvide
try {
String digest = generateDigest(id);
if (digest.equals(superDigest)) {
- cnxn.getAuthInfo().add(new Id("super", ""));
+ cnxn.addAuthInfo(new Id("super", ""));
}
- cnxn.getAuthInfo().add(new Id(getScheme(), digest));
+ cnxn.addAuthInfo(new Id(getScheme(), digest));
return KeeperException.Code.OK;
} catch (NoSuchAlgorithmException e) {
LOG.error("Missing algorithm",e);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java Wed Aug 18 06:24:08 2010
@@ -18,9 +18,9 @@
package org.apache.zookeeper.server.auth;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.KeeperException;
public class IPAuthenticationProvider implements AuthenticationProvider {
@@ -31,8 +31,8 @@ public class IPAuthenticationProvider im
public KeeperException.Code
handleAuthentication(ServerCnxn cnxn, byte[] authData)
{
- String id = cnxn.getRemoteAddress().getAddress().getHostAddress();
- cnxn.getAuthInfo().add(new Id(getScheme(), id));
+ String id = cnxn.getRemoteSocketAddress().getAddress().getHostAddress();
+ cnxn.addAuthInfo(new Id(getScheme(), id));
return KeeperException.Code.OK;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Wed Aug 18 06:24:08 2010
@@ -176,6 +176,7 @@ public class CommitProcessor extends Thr
}
public void shutdown() {
+ LOG.info("Shutting down");
synchronized (this) {
finished = true;
queuedRequests.clear();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Wed Aug 18 06:24:08 2010
@@ -383,14 +383,14 @@ public class FastLeaderElection implemen
this.ws = new WorkerSender(manager);
Thread t = new Thread(this.ws,
- "WorkerSender Thread");
+ "WorkerSender(" + Thread.currentThread().getName() + ")");
t.setDaemon(true);
t.start();
this.wr = new WorkerReceiver(manager);
t = new Thread(this.wr,
- "WorkerReceiver Thread");
+ "WorkerReceiver(" + Thread.currentThread().getName() + ")");
t.setDaemon(true);
t.start();
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Wed Aug 18 06:24:08 2010
@@ -59,8 +59,8 @@ public class Follower extends Learner{
*/
void followLeader() throws InterruptedException {
fzk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
- try {
- InetSocketAddress addr = findLeader();
+ try {
+ InetSocketAddress addr = findLeader();
try {
connectToLeader(addr);
long newLeaderZxid = registerWithLeader(Leader.FOLLOWERINFO);
@@ -76,8 +76,8 @@ public class Follower extends Learner{
QuorumPacket qp = new QuorumPacket();
while (self.isRunning()) {
readPacket(qp);
- processPacket(qp);
- }
+ processPacket(qp);
+ }
} catch (IOException e) {
LOG.warn("Exception when following the leader", e);
try {
@@ -137,7 +137,6 @@ public class Follower extends Learner{
}
}
-
/**
* The zxid of the last operation seen
* @return zxid
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -100,6 +100,7 @@ public class FollowerRequestProcessor ex
}
public void shutdown() {
+ LOG.info("Shutting down");
finished = true;
queuedRequests.clear();
queuedRequests.add(Request.requestOfDeath);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -80,7 +80,7 @@ public class FollowerZooKeeperServer ext
new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
-
+
LinkedBlockingQueue<Request> pendingTxns = new LinkedBlockingQueue<Request>();
public void logRequest(TxnHeader hdr, Record txn) {
@@ -135,6 +135,7 @@ public class FollowerZooKeeperServer ext
@Override
public void shutdown() {
+ LOG.info("Shutting down");
try {
super.shutdown();
} catch (Exception e) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Wed Aug 18 06:24:08 2010
@@ -383,6 +383,8 @@ public class Leader {
* Close down all the LearnerHandlers
*/
void shutdown(String reason) {
+ LOG.info("Shutting down");
+
if (isShutdown) {
return;
}
@@ -402,7 +404,7 @@ public class Leader {
LOG.warn("Ignoring unexpected exception during close",e);
}
// clear all the connections
- self.cnxnFactory.clear();
+ self.cnxnFactory.closeAll();
// shutdown the previous zk
if (zk != null) {
zk.shutdown();
@@ -551,6 +553,7 @@ public class Leader {
* @see org.apache.zookeeper.server.RequestProcessor#shutdown()
*/
public void shutdown() {
+ LOG.info("Shutting down");
next.shutdown();
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -163,7 +163,7 @@ public class LeaderZooKeeperServer exten
@Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
- int sessionTimeout) throws IOException, InterruptedException {
+ int sessionTimeout) throws IOException {
super.revalidateSession(cnxn, sessionId, sessionTimeout);
try {
// setowner as the leader itself, unless updated
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Aug 18 06:24:08 2010
@@ -88,10 +88,9 @@ public class Learner {
* the timeout for which the session is valid
* @return
* @throws IOException
- * @throws InterruptedException
*/
void validateSession(ServerCnxn cnxn, long clientId, int timeout)
- throws IOException, InterruptedException {
+ throws IOException {
LOG.info("Revalidating client: 0x" + Long.toHexString(clientId));
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(baos);
@@ -336,7 +335,7 @@ public class Learner {
+ Long.toHexString(sessionId)
+ " for validation");
} else {
- cnxn.finishSessionInit(valid);
+ zk.finishSessionInit(cnxn, valid);
}
}
if (LOG.isTraceEnabled()) {
@@ -369,7 +368,7 @@ public class Learner {
// set the zookeeper server to null
self.cnxnFactory.setZooKeeperServer(null);
// clear all the connections
- self.cnxnFactory.clear();
+ self.cnxnFactory.closeAll();
// shutdown previous zookeeper
if (zk != null) {
zk.shutdown();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Wed Aug 18 06:24:08 2010
@@ -77,7 +77,7 @@ public abstract class LearnerZooKeeperSe
@Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
- int sessionTimeout) throws IOException, InterruptedException {
+ int sessionTimeout) throws IOException {
getLearner().validateSession(cnxn, sessionId, sessionTimeout);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalPeerBean.java Wed Aug 18 06:24:08 2010
@@ -18,8 +18,6 @@
package org.apache.zookeeper.server.quorum;
-import org.apache.zookeeper.server.NIOServerCnxn;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
/**
* Implementation of the local peer MBean interface.
@@ -44,11 +42,7 @@ public class LocalPeerBean extends Serve
}
public int getMaxClientCnxnsPerHost() {
- NIOServerCnxn.Factory fac = peer.getCnxnFactory();
- if (fac == null) {
- return -1;
- }
- return fac.getMaxClientCnxns();
+ return peer.getMaxClientCnxnsPerHost();
}
public int getMinSessionTimeout() {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -114,6 +114,7 @@ public class ObserverRequestProcessor ex
* Shutdown the processor.
*/
public void shutdown() {
+ LOG.info("Shutting down");
finished = true;
queuedRequests.clear();
queuedRequests.add(Request.requestOfDeath);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Wed Aug 18 06:24:08 2010
@@ -18,15 +18,20 @@
package org.apache.zookeeper.server.quorum;
+import org.apache.log4j.Logger;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
/**
* This RequestProcessor simply forwards requests to an AckRequestProcessor and
* SyncRequestProcessor.
*/
public class ProposalRequestProcessor implements RequestProcessor {
+ private static final Logger LOG =
+ Logger.getLogger(ProposalRequestProcessor.class);
+
LeaderZooKeeperServer zks;
RequestProcessor nextProcessor;
@@ -75,6 +80,7 @@ public class ProposalRequestProcessor im
}
public void shutdown() {
+ LOG.info("Shutting down");
nextProcessor.shutdown();
syncProcessor.shutdown();
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Wed Aug 18 06:24:08 2010
@@ -433,9 +433,11 @@ public class QuorumCnxManager {
try {
ss = ServerSocketChannel.open();
int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
- LOG.info("My election bind port: " + port);
- ss.socket().setReuseAddress(true);
- ss.socket().bind(new InetSocketAddress(port));
+ ss.socket().setReuseAddress(true);
+ InetSocketAddress addr = new InetSocketAddress(port);
+ LOG.info("My election bind port: " + addr.toString());
+ setName(addr.toString());
+ ss.socket().bind(addr);
while (!shutdown) {
SocketChannel client = ss.accept();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Aug 18 06:24:08 2010
@@ -25,6 +25,7 @@ import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -33,7 +34,7 @@ import java.util.Map;
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -73,7 +74,7 @@ public class QuorumPeer extends Thread i
QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
-
+
/* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
* instantiated later. Also, it is created once on
@@ -81,14 +82,6 @@ public class QuorumPeer extends Thread i
* message from the leader
*/
private ZKDatabase zkDb;
-
- /**
- * Create an instance of a quorum peer
- */
- public interface Factory{
- public QuorumPeer create(NIOServerCnxn.Factory cnxnFactory) throws IOException;
- public NIOServerCnxn.Factory createConnectionFactory() throws IOException;
- }
public static class QuorumServer {
public QuorumServer(long id, InetSocketAddress addr,
@@ -343,7 +336,7 @@ public class QuorumPeer extends Thread i
Election electionAlg;
- NIOServerCnxn.Factory cnxnFactory;
+ ServerCnxnFactory cnxnFactory;
private FileTxnSnapLog logFactory = null;
private final QuorumStats quorumStats;
@@ -361,7 +354,7 @@ public class QuorumPeer extends Thread i
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
File dataLogDir, int electionType,
long myid, int tickTime, int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory) throws IOException {
+ ServerCnxnFactory cnxnFactory) throws IOException {
this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
initLimit, syncLimit, cnxnFactory,
new QuorumMaj(countParticipants(quorumPeers)));
@@ -370,7 +363,7 @@ public class QuorumPeer extends Thread i
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
File dataLogDir, int electionType,
long myid, int tickTime, int initLimit, int syncLimit,
- NIOServerCnxn.Factory cnxnFactory,
+ ServerCnxnFactory cnxnFactory,
QuorumVerifier quorumConfig) throws IOException {
this();
this.cnxnFactory = cnxnFactory;
@@ -459,8 +452,7 @@ public class QuorumPeer extends Thread i
{
this(quorumPeers, snapDir, logDir, electionAlg,
myid,tickTime, initLimit,syncLimit,
- new NIOServerCnxn.Factory(
- new InetSocketAddress(clientPort)),
+ ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
new QuorumMaj(countParticipants(quorumPeers)));
}
@@ -476,8 +468,8 @@ public class QuorumPeer extends Thread i
{
this(quorumPeers, snapDir, logDir, electionAlg,
myid,tickTime, initLimit,syncLimit,
- new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
- quorumConfig);
+ ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
+ quorumConfig);
}
/**
@@ -759,7 +751,9 @@ public class QuorumPeer extends Thread i
synchronized (this) {
if (leader != null) {
synchronized (leader.learners) {
- for (LearnerHandler fh : leader.learners) {
+ for (LearnerHandler fh :
+ (Collection<LearnerHandler>)leader.learners)
+ {
if (fh.getSocket() == null)
continue;
String s = fh.getSocket().getRemoteSocketAddress().toString();
@@ -819,6 +813,15 @@ public class QuorumPeer extends Thread i
this.tickTime = tickTime;
}
+ /** Maximum number of connections allowed from particular host (ip) */
+ public int getMaxClientCnxnsPerHost() {
+ ServerCnxnFactory fac = getCnxnFactory();
+ if (fac == null) {
+ return -1;
+ }
+ return fac.getMaxClientCnxnsPerHost();
+ }
+
/** minimum session timeout in milliseconds */
public int getMinSessionTimeout() {
return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
@@ -912,11 +915,11 @@ public class QuorumPeer extends Thread i
this.electionType = electionType;
}
- public NIOServerCnxn.Factory getCnxnFactory() {
+ public ServerCnxnFactory getCnxnFactory() {
return cnxnFactory;
}
- public void setCnxnFactory(NIOServerCnxn.Factory cnxnFactory) {
+ public void setCnxnFactory(ServerCnxnFactory cnxnFactory) {
this.cnxnFactory = cnxnFactory;
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Aug 18 06:24:08 2010
@@ -24,7 +24,7 @@ import javax.management.JMException;
import org.apache.log4j.Logger;
import org.apache.zookeeper.jmx.ManagedUtil;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -118,9 +118,9 @@ public class QuorumPeerMain {
LOG.info("Starting quorum peer");
try {
- NIOServerCnxn.Factory cnxnFactory =
- new NIOServerCnxn.Factory(config.getClientPortAddress(),
- config.getMaxClientCnxns());
+ ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
+ cnxnFactory.configure(config.getClientPortAddress(),
+ config.getMaxClientCnxns());
quorumPeer = new QuorumPeer();
quorumPeer.setClientPortAddress(config.getClientPortAddress());
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/CRCTest.java Wed Aug 18 06:24:08 2010
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -116,8 +115,7 @@ public class CRCTest extends ZKTestCase
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(150);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
LOG.info("starting up the zookeeper server .. waiting");
Assert.assertTrue("waiting for server being up",
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java Wed Aug 18 06:24:08 2010
@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
import java.io.File;
import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
@@ -60,8 +59,7 @@ public class InvalidSnapshotTest extends
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(100);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
Assert.assertTrue("waiting for server being up ",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
@@ -85,7 +83,7 @@ public class InvalidSnapshotTest extends
// now restart the server and see if it starts
zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(100);
- f = new NIOServerCnxn.Factory(new InetSocketAddress(PORT));
+ f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
Assert.assertTrue("waiting for server being up ",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ACLTest.java Wed Aug 18 06:24:08 2010
@@ -21,7 +21,6 @@ package org.apache.zookeeper.test;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.File;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -37,7 +36,7 @@ import org.apache.zookeeper.Watcher.Even
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
@@ -56,25 +55,26 @@ public class ACLTest extends ZKTestCase
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(1000);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
- LOG.info("starting up the zookeeper server .. waiting");
- Assert.assertTrue("waiting for server being up",
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
try {
- zk.addAuthInfo("digest", "pat:test".getBytes());
- zk.setACL("/", Ids.CREATOR_ALL_ACL, -1);
+ LOG.info("starting up the zookeeper server .. waiting");
+ Assert.assertTrue("waiting for server being up",
+ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+ ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+ try {
+ zk.addAuthInfo("digest", "pat:test".getBytes());
+ zk.setACL("/", Ids.CREATOR_ALL_ACL, -1);
+ } finally {
+ zk.close();
+ }
} finally {
- zk.close();
- }
+ f.shutdown();
- f.shutdown();
-
- Assert.assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT,
- ClientBase.CONNECTION_TIMEOUT));
+ Assert.assertTrue("waiting for server down",
+ ClientBase.waitForServerDown(HOSTPORT,
+ ClientBase.CONNECTION_TIMEOUT));
+ }
}
/**
@@ -89,74 +89,78 @@ public class ACLTest extends ZKTestCase
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(1000);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
- LOG.info("starting up the zookeeper server .. waiting");
- Assert.assertTrue("waiting for server being up",
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
- ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+ ZooKeeper zk;
String path;
- LOG.info("starting creating acls");
- for (int i = 0; i < 100; i++) {
- path = "/" + i;
- zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
- CreateMode.PERSISTENT);
- }
- Assert.assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
- for (int j = 100; j < 200; j++) {
- path = "/" + j;
- ACL acl = new ACL();
- acl.setPerms(0);
- Id id = new Id();
- id.setId("1.1.1."+j);
- id.setScheme("ip");
- acl.setId(id);
- ArrayList<ACL> list = new ArrayList<ACL>();
- list.add(acl);
- zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+ try {
+ LOG.info("starting up the zookeeper server .. waiting");
+ Assert.assertTrue("waiting for server being up",
+ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+ zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
+ LOG.info("starting creating acls");
+ for (int i = 0; i < 100; i++) {
+ path = "/" + i;
+ zk.create(path, path.getBytes(), Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue("size of the acl map ", (1 == zks.getZKDatabase().getAclSize()));
+ for (int j = 100; j < 200; j++) {
+ path = "/" + j;
+ ACL acl = new ACL();
+ acl.setPerms(0);
+ Id id = new Id();
+ id.setId("1.1.1."+j);
+ id.setScheme("ip");
+ acl.setId(id);
+ ArrayList<ACL> list = new ArrayList<ACL>();
+ list.add(acl);
+ zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
+ } finally {
+ // now shutdown the server and restart it
+ f.shutdown();
+ Assert.assertTrue("waiting for server down",
+ ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
}
- Assert.assertTrue("size of the acl map ", (101 == zks.getZKDatabase().getAclSize()));
- // now shutdown the server and restart it
- f.shutdown();
- Assert.assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT, CONNECTION_TIMEOUT));
startSignal = new CountDownLatch(1);
zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
- f = new NIOServerCnxn.Factory(new InetSocketAddress(PORT));
+ f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
-
- Assert.assertTrue("waiting for server up",
- ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
-
- startSignal.await(CONNECTION_TIMEOUT,
- TimeUnit.MILLISECONDS);
- Assert.assertTrue("count == 0", startSignal.getCount() == 0);
-
- Assert.assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
- for (int j = 200; j < 205; j++) {
- path = "/" + j;
- ACL acl = new ACL();
- acl.setPerms(0);
- Id id = new Id();
- id.setId("1.1.1."+j);
- id.setScheme("ip");
- acl.setId(id);
- ArrayList<ACL> list = new ArrayList<ACL>();
- list.add(acl);
- zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+ try {
+ Assert.assertTrue("waiting for server up",
+ ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+
+ startSignal.await(CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ Assert.assertTrue("count == 0", startSignal.getCount() == 0);
+
+ Assert.assertTrue("acl map ", (101 == zks.getZKDatabase().getAclSize()));
+ for (int j = 200; j < 205; j++) {
+ path = "/" + j;
+ ACL acl = new ACL();
+ acl.setPerms(0);
+ Id id = new Id();
+ id.setId("1.1.1."+j);
+ id.setScheme("ip");
+ acl.setId(id);
+ ArrayList<ACL> list = new ArrayList<ACL>();
+ list.add(acl);
+ zk.create(path, path.getBytes(), list, CreateMode.PERSISTENT);
+ }
+ Assert.assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
+
+ zk.close();
+ } finally {
+ f.shutdown();
+
+ Assert.assertTrue("waiting for server down",
+ ClientBase.waitForServerDown(HOSTPORT,
+ ClientBase.CONNECTION_TIMEOUT));
}
- Assert.assertTrue("acl map ", (106 == zks.getZKDatabase().getAclSize()));
-
- zk.close();
-
- f.shutdown();
-
- Assert.assertTrue("waiting for server down",
- ClientBase.waitForServerDown(HOSTPORT,
- ClientBase.CONNECTION_TIMEOUT));
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncHammerTest.java Wed Aug 18 06:24:08 2010
@@ -35,9 +35,7 @@ import org.apache.zookeeper.AsyncCallbac
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
-import org.junit.After;
import org.junit.Assert;
-import org.junit.Before;
import org.junit.Test;
public class AsyncHammerTest extends ZKTestCase
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Wed Aug 18 06:24:08 2010
@@ -25,7 +25,6 @@ import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
-import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,10 +44,10 @@ import org.apache.zookeeper.PortAssignme
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnLog;
@@ -56,6 +55,7 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import com.j2speed.accessor.FieldAccessor;
import com.sun.management.UnixOperatingSystemMXBean;
public abstract class ClientBase extends ZKTestCase {
@@ -67,8 +67,11 @@ public abstract class ClientBase extends
protected String hostPort = "127.0.0.1:" + PortAssignment.unique();
protected int maxCnxns = 0;
- protected NIOServerCnxn.Factory serverFactory = null;
+ protected ServerCnxnFactory serverFactory = null;
protected File tmpDir = null;
+
+ long initialFdCount;
+
public ClientBase() {
super();
}
@@ -330,14 +333,14 @@ public abstract class ClientBase extends
return Integer.parseInt(portstr);
}
- static NIOServerCnxn.Factory createNewServerInstance(File dataDir,
- NIOServerCnxn.Factory factory, String hostPort, int maxCnxns)
+ static ServerCnxnFactory createNewServerInstance(File dataDir,
+ ServerCnxnFactory factory, String hostPort, int maxCnxns)
throws IOException, InterruptedException
{
ZooKeeperServer zks = new ZooKeeperServer(dataDir, dataDir, 3000);
final int PORT = getPort(hostPort);
if (factory == null) {
- factory = new NIOServerCnxn.Factory(new InetSocketAddress(PORT),maxCnxns);
+ factory = ServerCnxnFactory.createFactory(PORT, maxCnxns);
}
factory.startup(zks);
Assert.assertTrue("waiting for server up",
@@ -347,11 +350,16 @@ public abstract class ClientBase extends
return factory;
}
- static void shutdownServerInstance(NIOServerCnxn.Factory factory,
+ static void shutdownServerInstance(ServerCnxnFactory factory,
String hostPort)
{
if (factory != null) {
- ZKDatabase zkDb = factory.getZooKeeperServer().getZKDatabase();
+ ZKDatabase zkDb;
+ {
+ ZooKeeperServer zs = getServer(factory);
+
+ zkDb = zs.getZKDatabase();
+ }
factory.shutdown();
try {
zkDb.close();
@@ -385,16 +393,6 @@ public abstract class ClientBase extends
@Before
public void setUp() throws Exception {
- setupTestEnv();
-
- JMXEnv.setUp();
-
- setUpAll();
-
- tmpDir = createTmpDir(BASETEST);
-
- startServer();
-
/* some useful information - log the number of fds used before
* and after a test is run. Helps to verify we are freeing resources
* correctly. Unfortunately this only works on unix systems (the
@@ -405,10 +403,21 @@ public abstract class ClientBase extends
if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
UnixOperatingSystemMXBean unixos =
(UnixOperatingSystemMXBean)osMbean;
+ initialFdCount = unixos.getOpenFileDescriptorCount();
LOG.info("Initial fdcount is: "
- + unixos.getOpenFileDescriptorCount());
+ + initialFdCount);
}
+ setupTestEnv();
+
+ JMXEnv.setUp();
+
+ setUpAll();
+
+ tmpDir = createTmpDir(BASETEST);
+
+ startServer();
+
LOG.info("Client test setup finished");
}
@@ -427,8 +436,14 @@ public abstract class ClientBase extends
JMXEnv.ensureOnly();
}
- protected ZooKeeperServer getServer() {
- return serverFactory.getZooKeeperServer();
+ protected static ZooKeeperServer getServer(ServerCnxnFactory fac) {
+ // access the private field - test only
+ FieldAccessor<ServerCnxnFactory,ZooKeeperServer> zkServerAcc =
+ new FieldAccessor<ServerCnxnFactory,ZooKeeperServer>
+ ("zkServer", ServerCnxnFactory.class);
+ ZooKeeperServer zs = zkServerAcc.get(fac);
+
+ return zs;
}
protected void tearDownAll() throws Exception {
@@ -449,20 +464,6 @@ public abstract class ClientBase extends
public void tearDown() throws Exception {
LOG.info("tearDown starting");
- /* some useful information - log the number of fds used before
- * and after a test is run. Helps to verify we are freeing resources
- * correctly. Unfortunately this only works on unix systems (the
- * only place sun has implemented as part of the mgmt bean api.
- */
- OperatingSystemMXBean osMbean =
- ManagementFactory.getOperatingSystemMXBean();
- if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
- UnixOperatingSystemMXBean unixos =
- (UnixOperatingSystemMXBean)osMbean;
- LOG.info("fdcount after test is: "
- + unixos.getOpenFileDescriptorCount());
- }
-
tearDownAll();
stopServer();
@@ -475,6 +476,27 @@ public abstract class ClientBase extends
serverFactory = null;
JMXEnv.tearDown();
+
+ /* some useful information - log the number of fds used before
+ * and after a test is run. Helps to verify we are freeing resources
+ * correctly. Unfortunately this only works on unix systems (the
+ * only place sun has implemented as part of the mgmt bean api.
+ */
+ OperatingSystemMXBean osMbean =
+ ManagementFactory.getOperatingSystemMXBean();
+ if (osMbean != null && osMbean instanceof UnixOperatingSystemMXBean) {
+ UnixOperatingSystemMXBean unixos =
+ (UnixOperatingSystemMXBean)osMbean;
+ long fdCount = unixos.getOpenFileDescriptorCount();
+ String message = "fdcount after test is: "
+ + fdCount + " at start it was " + initialFdCount;
+ LOG.info(message);
+ if (fdCount > initialFdCount) {
+ LOG.info("sleeping for 20 secs");
+ //Thread.sleep(60000);
+ //assertTrue(message, fdCount <= initialFdCount);
+ }
+ }
}
public static MBeanServerConnection jmxConn() throws IOException {
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientPortBindTest.java Wed Aug 18 06:24:08 2010
@@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
import org.junit.Test;
@@ -82,8 +82,8 @@ public class ClientPortBindTest extends
ClientBase.setupTestEnv();
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(bindAddress, PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(
+ new InetSocketAddress(bindAddress, PORT), -1);
f.startup(zks);
LOG.info("starting up the the server, waiting");
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DisconnectableZooKeeper.java Wed Aug 18 06:24:08 2010
@@ -46,4 +46,13 @@ public class DisconnectableZooKeeper ext
cnxn.disconnect();
}
+ /**
+ * Prevent the client from automatically reconnecting if the connection to the
+ * server is lost
+ */
+ public void dontReconnect() throws Exception {
+ java.lang.reflect.Field f = cnxn.getClass().getDeclaredField("closing");
+ f.setAccessible(true);
+ f.setBoolean(cnxn, true);
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/InvalidSnapshotTest.java Wed Aug 18 06:24:08 2010
@@ -21,7 +21,6 @@ package org.apache.zookeeper.test;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.io.File;
-import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import org.apache.log4j.Logger;
@@ -32,7 +31,7 @@ import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.server.LogFormatter;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
@@ -68,8 +67,7 @@ public class InvalidSnapshotTest extends
ZooKeeperServer zks = new ZooKeeperServer(snapDir, snapDir, 3000);
SyncRequestProcessor.setSnapCount(1000);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
LOG.info("starting up the zookeeper server .. waiting");
Assert.assertTrue("waiting for server being up",
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java Wed Aug 18 06:24:08 2010
@@ -25,6 +25,7 @@ import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
+import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -34,7 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.LeaderElection;
import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -93,7 +94,9 @@ public class LENonTerminateTest extends
requestBuffer.putInt(xid);
requestPacket.setLength(4);
HashSet<Long> heardFrom = new HashSet<Long>();
- for (QuorumServer server : self.getVotingView().values()) {
+ for (QuorumServer server :
+ (Collection<QuorumServer>)self.getVotingView().values())
+ {
LOG.info("Server address: " + server.addr);
try {
requestPacket.setSocketAddress(server.addr);
@@ -213,7 +216,7 @@ public class LENonTerminateTest extends
{
super(quorumPeers, snapDir, logDir, electionAlg,
myid,tickTime, initLimit,syncLimit,
- new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
+ ServerCnxnFactory.createFactory(clientPort, -1),
new QuorumMaj(countParticipants(quorumPeers)));
}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteBase.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@RunWith(Suite.class)
+public class NioNettySuiteBase {
+ @BeforeClass
+ public static void setUp() {
+ System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY,
+ NettyServerCnxnFactory.class.getName());
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+ }
+}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteHammerTest.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@Suite.SuiteClasses({
+ AsyncHammerTest.class
+ })
+public class NioNettySuiteHammerTest extends NioNettySuiteBase {
+}
Added: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java?rev=986575&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java (added)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/NioNettySuiteTest.java Wed Aug 18 06:24:08 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.junit.runners.Suite;
+
+/**
+ * Run tests with: Nio Client against Netty server
+ */
+@Suite.SuiteClasses({
+ ACLTest.class,
+ AsyncOpsTest.class,
+ ChrootClientTest.class,
+ ClientTest.class,
+ FourLetterWordsTest.class,
+ NullDataTest.class,
+ SessionTest.class,
+ WatcherTest.class
+ })
+public class NioNettySuiteTest extends NioNettySuiteBase {
+}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/OOMTest.java Wed Aug 18 06:24:08 2010
@@ -22,7 +22,6 @@ import static org.apache.zookeeper.test.
import java.io.File;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.util.ArrayList;
import org.apache.zookeeper.CreateMode;
@@ -34,7 +33,7 @@ import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.junit.Assert;
import org.junit.Test;
@@ -42,7 +41,7 @@ import org.junit.Test;
public class OOMTest extends ZKTestCase implements Watcher {
@Test
public void testOOM() throws IOException, InterruptedException, KeeperException {
- // This test takes too long to run!
+ // This test takes too long tos run!
if (true)
return;
File tmpDir = ClientBase.createTmpDir();
@@ -61,8 +60,7 @@ public class OOMTest extends ZKTestCase
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
final int PORT = PortAssignment.unique();
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
Assert.assertTrue("waiting for server up",
ClientBase.waitForServerUp("127.0.0.1:" + PORT,
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java Wed Aug 18 06:24:08 2010
@@ -111,18 +111,23 @@ public class ObserverTest extends Quorum
Assert.assertEquals(zk.getState(), States.CONNECTED);
+ LOG.info("Shutting down server 2");
// Now kill one of the other real servers
q2.shutdown();
Assert.assertTrue("Waiting for server 2 to shut down",
ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP2,
ClientBase.CONNECTION_TIMEOUT));
-
+
+ LOG.info("Server 2 down");
+
// Now the resulting ensemble shouldn't be quorate
latch.await();
Assert.assertNotSame("Client is still connected to non-quorate cluster",
KeeperState.SyncConnected,lastEvent.getState());
+ LOG.info("Latch returned");
+
try {
Assert.assertFalse("Shouldn't get a response when cluster not quorate!",
new String(zk.getData("/obstest", null, null)).equals("test"));
@@ -133,14 +138,19 @@ public class ObserverTest extends Quorum
latch = new CountDownLatch(1);
+ LOG.info("Restarting server 2");
+
// Bring it back
q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
q2.start();
+
LOG.info("Waiting for server 2 to come up");
Assert.assertTrue("waiting for server 2 being up",
ClientBase.waitForServerUp("127.0.0.1:" + CLIENT_PORT_QP2,
CONNECTION_TIMEOUT));
+ LOG.info("Server 2 started, waiting for latch");
+
latch.await();
// It's possible our session expired - but this is ok, shows we
// were able to talk to the ensemble
@@ -149,10 +159,14 @@ public class ObserverTest extends Quorum
(KeeperState.SyncConnected==lastEvent.getState() ||
KeeperState.Expired==lastEvent.getState()));
+ LOG.info("Shutting down all servers");
+
q1.shutdown();
q2.shutdown();
q3.shutdown();
+ LOG.info("Closing zk client");
+
zk.close();
Assert.assertTrue("Waiting for server 1 to shut down",
ClientBase.waitForServerDown("127.0.0.1:"+CLIENT_PORT_QP1,
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java Wed Aug 18 06:24:08 2010
@@ -19,7 +19,6 @@
package org.apache.zookeeper.test;
import java.io.File;
-import java.net.InetSocketAddress;
import java.util.List;
import org.apache.zookeeper.CreateMode;
@@ -29,8 +28,8 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.NIOServerCnxn;
import org.apache.zookeeper.server.PurgeTxnLog;
+import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.SyncRequestProcessor;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -52,8 +51,7 @@ public class PurgeTxnTest extends ZKTest
ZooKeeperServer zks = new ZooKeeperServer(tmpDir, tmpDir, 3000);
SyncRequestProcessor.setSnapCount(100);
final int PORT = Integer.parseInt(HOSTPORT.split(":")[1]);
- NIOServerCnxn.Factory f = new NIOServerCnxn.Factory(
- new InetSocketAddress(PORT));
+ ServerCnxnFactory f = ServerCnxnFactory.createFactory(PORT, -1);
f.startup(zks);
Assert.assertTrue("waiting for server being up ",
ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Wed Aug 18 06:24:08 2010
@@ -35,6 +35,7 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.junit.Assert;
+import org.junit.Test;
import com.sun.management.UnixOperatingSystemMXBean;
@@ -55,6 +56,11 @@ public class QuorumBase extends ClientBa
private int portLE4;
private int portLE5;
+ @Test
+ // This just avoids complaints by junit
+ public void testNull() {
+ }
+
@Override
public void setUp() throws Exception {
setUp(false);
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=986575&r1=986574&r2=986575&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Aug 18 06:24:08 2010
@@ -29,10 +29,10 @@ import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.ZooDefs;
-import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LearnerHandler;
@@ -40,6 +40,7 @@ import org.apache.zookeeper.server.quoru
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Ignore;
import org.junit.Test;
public class QuorumTest extends QuorumBase {
@@ -187,16 +188,21 @@ public class QuorumTest extends QuorumBa
* @throws KeeperException
*/
@Test
- public void testSessionMoved() throws IOException, InterruptedException, KeeperException {
+ @Ignore
+ public void testSessionMoved() throws Exception {
String hostPorts[] = qb.hostPort.split(",");
- DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0], ClientBase.CONNECTION_TIMEOUT, new Watcher() {
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hostPorts[0],
+ ClientBase.CONNECTION_TIMEOUT, new Watcher() {
public void process(WatchedEvent event) {
}});
zk.create("/sessionMoveTest", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// we want to loop through the list twice
for(int i = 0; i < hostPorts.length*2; i++) {
+ zk.dontReconnect();
// This should stomp the zk handle
- DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length], ClientBase.CONNECTION_TIMEOUT,
+ DisconnectableZooKeeper zknew =
+ new DisconnectableZooKeeper(hostPorts[(i+1)%hostPorts.length],
+ ClientBase.CONNECTION_TIMEOUT,
new Watcher() {public void process(WatchedEvent event) {
}},
zk.getSessionId(),
@@ -207,7 +213,6 @@ public class QuorumTest extends QuorumBa
Assert.fail("Should have lost the connection");
} catch(KeeperException.ConnectionLossException e) {
}
- zk.disconnect(); // close w/o closing session
zk = zknew;
}
zk.close();
@@ -227,15 +232,17 @@ public class QuorumTest extends QuorumBa
* make sure we cannot do any changes.
*/
@Test
- public void testSessionMove() throws IOException, InterruptedException, KeeperException {
+ @Ignore
+ public void testSessionMove() throws Exception {
String hps[] = qb.hostPort.split(",");
DiscoWatcher oldWatcher = new DiscoWatcher();
- ZooKeeper zk = new DisconnectableZooKeeper(hps[0],
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(hps[0],
ClientBase.CONNECTION_TIMEOUT, oldWatcher);
zk.create("/t1", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ zk.dontReconnect();
// This should stomp the zk handle
DiscoWatcher watcher = new DiscoWatcher();
- ZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(hps[1],
ClientBase.CONNECTION_TIMEOUT, watcher, zk.getSessionId(),
zk.getSessionPasswd());
zknew.create("/t2", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
@@ -257,6 +264,7 @@ public class QuorumTest extends QuorumBa
toClose.add(zknew);
// Let's just make sure it can still move
for(int i = 0; i < 10; i++) {
+ zknew.dontReconnect();
zknew = new DisconnectableZooKeeper(hps[1],
ClientBase.CONNECTION_TIMEOUT, new DiscoWatcher(),
zk.getSessionId(), zk.getSessionPasswd());