You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2008/10/03 15:06:41 UTC
svn commit: r701369 - in /hadoop/zookeeper/trunk/src/java:
main/org/apache/jute/ main/org/apache/zookeeper/
main/org/apache/zookeeper/server/ main/org/apache/zookeeper/server/auth/
main/org/apache/zookeeper/server/quorum/ test/org/apache/zookeeper/test/
Author: fpj
Date: Fri Oct 3 06:06:39 2008
New Revision: 701369
URL: http://svn.apache.org/viewvc?rev=701369&view=rev
Log:
ZOOKEEPER-136.patch
Added:
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
Modified:
hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java
hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/jute/CsvOutputArchive.java Fri Oct 3 06:06:39 2008
@@ -103,6 +103,9 @@
}
public void writeRecord(Record r, String tag) throws IOException {
+ if (r == null) {
+ return;
+ }
r.serialize(this, tag);
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Fri Oct 3 06:06:39 2008
@@ -73,8 +73,7 @@
public class ClientCnxn {
private static final Logger LOG = Logger.getLogger(ClientCnxn.class);
- private ArrayList<InetSocketAddress> serverAddrs =
- new ArrayList<InetSocketAddress>();
+ private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
static class AuthData {
AuthData(String scheme, byte data[]) {
@@ -121,10 +120,12 @@
final Selector selector = Selector.open();
- /** Set to true when close is called. Latches the connection such that
- * we don't attempt to re-connect to the server if in the middle of
- * closing the connection (client sends session disconnect to server
- * as part of close operation) */
+ /**
+ * Set to true when close is called. Latches the connection such that we
+ * don't attempt to re-connect to the server if in the middle of closing the
+ * connection (client sends session disconnect to server as part of close
+ * operation)
+ */
volatile boolean closing = false;
public long getSessionId() {
@@ -138,7 +139,8 @@
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
- sb.append("sessionId: 0x").append(Long.toHexString(getSessionId())).append("\n");
+ sb.append("sessionId: 0x").append(Long.toHexString(getSessionId()))
+ .append("\n");
sb.append("lastZxid: ").append(lastZxid).append("\n");
sb.append("xid: ").append(xid).append("\n");
sb.append("nextAddrToTry: ").append(nextAddrToTry).append("\n");
@@ -200,8 +202,25 @@
}
this.watchRegistration = watchRegistration;
}
+
+ @Override
+ public String toString() {
+ StringBuffer sb = new StringBuffer();
+
+ sb.append("path:" + path);
+ sb.append(" finished:" + finished);
+
+ sb.append(" header:: " + header);
+ sb.append(" replyHeader:: " + replyHeader);
+ sb.append(" request:: " + request);
+ sb.append(" response:: " + response);
+
+ // jute toString is horrible, remove unnecessary newlines
+ return sb.toString().replaceAll("\r*\n+", " ");
+ }
}
+
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher)
throws IOException
@@ -224,8 +243,7 @@
*/
public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, long sessionId, byte[] sessionPasswd)
- throws IOException
- {
+ throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -477,20 +495,28 @@
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
- ReplyHeader r = new ReplyHeader();
+ ReplyHeader replyHdr = new ReplyHeader();
- r.deserialize(bbia, "header");
- if (r.getXid() == -2) {
+ replyHdr.deserialize(bbia, "header");
+ if (replyHdr.getXid() == -2) {
// -2 is the xid for pings
+ LOG
+ .debug("Got ping sessionid:0x"
+ + Long.toHexString(sessionId));
return;
}
- if (r.getXid() == -4) {
+ if (replyHdr.getXid() == -4) {
// -2 is the xid for AuthPacket
// TODO: process AuthPacket here
+ LOG
+ .debug("Got auth sessionid:0x"
+ + Long.toHexString(sessionId));
return;
}
- if (r.getXid() == -1) {
+ if (replyHdr.getXid() == -1) {
// -1 means notification
+ LOG.debug("Got notification sessionid:0x"
+ + Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
WatchedEvent we = new WatchedEvent(event);
@@ -504,29 +530,37 @@
}
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
- + r.getXid());
+ + replyHdr.getXid());
}
- Packet p = null;
+ Packet packet = null;
synchronized (pendingQueue) {
- p = pendingQueue.remove();
+ packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
- if (p.header.getXid() != r.getXid()) {
- throw new IOException("Xid out of order. Got " + r.getXid()
- + " expected " + p.header.getXid());
- }
- p.replyHeader.setXid(r.getXid());
- p.replyHeader.setErr(r.getErr());
- p.replyHeader.setZxid(r.getZxid());
- lastZxid = r.getZxid();
- if (p.response != null && r.getErr() == 0) {
- p.response.deserialize(bbia, "response");
+ if (packet.header.getXid() != replyHdr.getXid()) {
+ throw new IOException("Xid out of order. Got "
+ + replyHdr.getXid() + " expected "
+ + packet.header.getXid());
}
- p.finished = true;
- finishPacket(p);
+
+ packet.replyHeader.setXid(replyHdr.getXid());
+ packet.replyHeader.setErr(replyHdr.getErr());
+ packet.replyHeader.setZxid(replyHdr.getZxid());
+ lastZxid = replyHdr.getZxid();
+ if (packet.response != null && replyHdr.getErr() == 0) {
+ packet.response.deserialize(bbia, "response");
+ }
+ packet.finished = true;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reading reply sessionid:0x"
+ + Long.toHexString(sessionId) + ", packet:: " + packet);
+ }
+
+ finishPacket(packet);
}
/**
@@ -789,14 +823,15 @@
} catch (Exception e) {
if (closing) {
// closing so this is expected
- LOG.info("Exception while closing send thread for session 0x"
+ LOG
+ .info("Exception while closing send thread for session 0x"
+ Long.toHexString(getSessionId())
+ " : " + e.getMessage());
break;
} else {
LOG.warn("Exception closing session 0x"
- + Long.toHexString(getSessionId()),
- e);
+ + Long.toHexString(getSessionId()) + " to "
+ + sockKey, e);
cleanup();
if (zooKeeper.state.isAlive()) {
eventThread.queueEvent(new WatchedEvent(
@@ -889,8 +924,8 @@
}
/**
- * Close the connection, which includes; send session disconnect to
- * the server, shutdown the send/event threads.
+ * Close the connection, which includes; send session disconnect to the
+ * server, shutdown the send/event threads.
*
* @throws IOException
*/
@@ -919,13 +954,10 @@
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
- Record response,
- WatchRegistration watchRegistration)
- throws InterruptedException
- {
+ Record response, WatchRegistration watchRegistration)
+ throws InterruptedException {
ReplyHeader r = new ReplyHeader();
- Packet packet =
- queuePacket(h, r, request, response, null, null, null,
+ Packet packet = queuePacket(h, r, request, response, null, null, null,
watchRegistration);
synchronized (packet) {
while (!packet.finished) {
@@ -937,8 +969,7 @@
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String path, Object ctx,
- WatchRegistration watchRegistration)
- {
+ WatchRegistration watchRegistration) {
Packet packet = null;
synchronized (outgoingQueue) {
if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Fri Oct 3 06:06:39 2008
@@ -69,9 +69,9 @@
}
public void processRequest(Request request) {
- // LOG.info("Zoo>>> cxid = " + request.cxid + " type = " +
- // request.type + " id = " + request.sessionId + " cnxn " +
- // request.cnxn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing request:: " + request);
+ }
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
@@ -130,6 +130,10 @@
if (request.hdr != null && request.hdr.getType() == OpCode.error) {
throw KeeperException.create(((ErrorTxn) request.txn).getErr());
}
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(request);
+ }
switch (request.type) {
case OpCode.ping:
request.cnxn.sendResponse(new ReplyHeader(-2,
@@ -157,7 +161,6 @@
err = rc.err;
break;
case OpCode.sync:
- LOG.debug("OpCode.sync " + request);
SyncRequest syncRequest = new SyncRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
syncRequest);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Fri Oct 3 06:06:39 2008
@@ -84,7 +84,7 @@
int outstandingLimit = 1;
public Factory(int port) throws IOException {
- super("NIOServerCxn.Factory");
+ super("NIOServerCxn.Factory:" + port);
setDaemon(true);
this.ss = ServerSocketChannel.open();
ss.socket().bind(new InetSocketAddress(port));
@@ -421,7 +421,7 @@
|| ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) {
if (ap == null)
LOG.error("No authentication provider for scheme: "
- + scheme);
+ + scheme + " has " + ProviderRegistry.listProviders());
else
LOG.debug("Authentication failed for scheme: "
+ scheme);
@@ -449,6 +449,7 @@
outstandingRequests++;
// check throttling
if (zk.getInProcess() > factory.outstandingLimit) {
+ LOG.warn("Throttling recv " + zk.getInProcess());
disableRecv();
// following lines should not be needed since we are already
// reading
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Fri Oct 3 06:06:39 2008
@@ -76,7 +76,7 @@
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
- super("ProcessThread");
+ super("ProcessThread:" + zks.getClientPort());
this.nextProcessor = nextProcessor;
this.zks = zks;
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Fri Oct 3 06:06:39 2008
@@ -163,19 +163,19 @@
@Override
public String toString() {
StringBuffer sb = new StringBuffer();
- sb.append("session 0x").append(Long.toHexString(sessionId));
- sb.append(" cxid 0x").append(Long.toHexString(cxid));
- sb.append("zxid 0x").append(Long.toHexString((hdr == null ? -2 : hdr.getZxid()))).append(
- " ");
- sb
- .append(
- " txn type = "
- + (hdr == null ? "unknown" : "" + hdr.getType()))
- .append(" ");
- sb.append(op2String(type)).append(" ");
+ sb.append("sessionid:0x").append(Long.toHexString(sessionId));
+ sb.append(" type:").append(op2String(type));
+ sb.append(" cxid:0x").append(Long.toHexString(cxid));
+ sb.append(" zxid:0x").append(Long.toHexString((hdr == null ?
+ -2 : hdr.getZxid())));
+ sb.append(" txntype:" + (hdr == null ?
+ "unknown" : "" + hdr.getType()));
+ sb.append(" ");
String path = "n/a";
- if (type != OpCode.createSession) {
+ if (type != OpCode.createSession && request != null
+ && request.remaining() >= 4)
+ {
try {
request.clear();
int pathLen = request.getInt();
@@ -188,6 +188,7 @@
}
}
sb.append(path).append(" ");
+
return sb.toString();
}
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java Fri Oct 3 06:06:39 2008
@@ -54,7 +54,7 @@
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
- super("SyncThread");
+ super("SyncThread:" + zks.getClientPort());
this.zks = zks;
this.nextProcessor = nextProcessor;
start();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Fri Oct 3 06:06:39 2008
@@ -467,6 +467,11 @@
*/
public void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
+ Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
+ submitRequest(si);
+ }
+
+ public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
@@ -482,16 +487,15 @@
}
}
try {
- touch(cnxn);
- Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
- boolean validpacket = Request.isValid(type);
+ touch(si.cnxn);
+ boolean validpacket = Request.isValid(si.type);
if (validpacket) {
firstProcessor.processRequest(si);
- if (cnxn != null) {
+ if (si.cnxn != null) {
incInProcess();
}
} else {
- LOG.warn("Dropping packet at server of type " + type);
+ LOG.warn("Dropping packet at server of type " + si.type);
// if unvalid packet drop the packet.
}
} catch (IOException e) {
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/auth/ProviderRegistry.java Fri Oct 3 06:06:39 2008
@@ -36,7 +36,6 @@
synchronized (ProviderRegistry.class) {
if (initialized)
return;
- initialized = true;
IPAuthenticationProvider ipp = new IPAuthenticationProvider();
HostAuthenticationProvider hostp = new HostAuthenticationProvider();
DigestAuthenticationProvider digp = new DigestAuthenticationProvider();
@@ -59,6 +58,7 @@
}
}
}
+ initialized = true;
}
}
@@ -67,4 +67,12 @@
initialize();
return authenticationProviders.get(scheme);
}
+
+ public static String listProviders() {
+ StringBuilder sb = new StringBuilder();
+ for(String s: authenticationProviders.keySet()) {
+ sb.append(s + " ");
+}
+ return sb.toString();
+ }
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Fri Oct 3 06:06:39 2008
@@ -49,8 +49,17 @@
RequestProcessor nextProcessor;
- public CommitProcessor(RequestProcessor nextProcessor) {
+ /**
+ * This flag indicates whether we need to wait for a response to come back from the
+ * leader or we just let the sync operation flow through like a read. The flag will
+ * be true if the CommitProcessor is in a Leader pipeline.
+ */
+ boolean matchSyncs;
+
+ public CommitProcessor(RequestProcessor nextProcessor, String id, boolean matchSyncs) {
+ super("CommitProcessor:" + id);
this.nextProcessor = nextProcessor;
+ this.matchSyncs = matchSyncs;
start();
}
@@ -122,8 +131,11 @@
nextPending = request;
break;
case OpCode.sync:
- nextPending = request;
- //pendingSyncs.add(request);
+ if (matchSyncs) {
+ nextPending = request;
+ } else {
+ toProcess.add(request);
+ }
break;
default:
toProcess.add(request);
@@ -145,7 +157,9 @@
new Exception("committing a null! "));
return;
}
- LOG.debug("Committing" + request.cxid);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Committing request:: " + request);
+ }
committedRequests.add(request);
notifyAll();
}
@@ -153,9 +167,10 @@
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
- // LOG.info("Zoo processReq>>> cxid = " + request.cxid + " type =
- // " + request.type + " id = " + request.sessionId + " cnxn " +
- // request.cnxn);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing request:: " + request);
+ }
+
if (!finished) {
queuedRequests.add(request);
notifyAll();
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerHandler.java Fri Oct 3 06:06:39 2008
@@ -330,10 +330,12 @@
type = bb.getInt();
bb = bb.slice();
if(type == OpCode.sync){
- leader.setSyncHandler(this, sessionId);
- }
+ leader.zk.submitRequest(new FollowerSyncRequest(this, sessionId, cxid, type, bb,
+ qp.getAuthinfo()));
+ } else {
leader.zk.submitRequest(null, sessionId, type, cxid, bb,
qp.getAuthinfo());
+ }
break;
default:
}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Fri Oct 3 06:06:39 2008
@@ -45,6 +45,7 @@
public FollowerRequestProcessor(FollowerZooKeeperServer zks,
RequestProcessor nextProcessor) {
+ super("FollowerRequestProcessor:" + zks.getClientPort());
this.zks = zks;
this.nextProcessor = nextProcessor;
start();
Added: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java?rev=701369&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java (added)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerSyncRequest.java Fri Oct 3 06:06:39 2008
@@ -0,0 +1,16 @@
+package org.apache.zookeeper.server.quorum;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.Request;
+
+public class FollowerSyncRequest extends Request {
+ FollowerHandler fh;
+ public FollowerSyncRequest(FollowerHandler fh, long sessionId, int xid, int type,
+ ByteBuffer bb, List<Id> authInfo) {
+ super(null, sessionId, xid, type, bb, authInfo);
+ this.fh = fh;
+ }
+}
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java Fri Oct 3 06:06:39 2008
@@ -80,7 +80,8 @@
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
- commitProcessor = new CommitProcessor(finalProcessor);
+ commitProcessor = new CommitProcessor(finalProcessor,
+ Integer.toString(getClientPort()), true);
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor(getFollower()));
@@ -135,16 +136,16 @@
commitProcessor.commit(request);
}
- public void sync(){
+ synchronized public void sync(){
if(pendingSyncs.size() ==0){
LOG.warn("Not expecting a sync.");
return;
}
- commitProcessor.commit(pendingSyncs.remove());
+ Request r = pendingSyncs.remove();
+ commitProcessor.commit(r);
}
-
@Override
public int getGlobalOutstandingLimit() {
return super.getGlobalOutstandingLimit() / (self.getQuorumSize() - 1);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Fri Oct 3 06:06:39 2008
@@ -25,9 +25,11 @@
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.log4j.Logger;
@@ -70,11 +72,8 @@
public HashSet<FollowerHandler> forwardingFollowers = new HashSet<FollowerHandler>();
//Pending sync requests
- public HashMap<Long,Request> pendingSyncs = new HashMap<Long,Request>();
+ public HashMap<Long,List<FollowerSyncRequest>> pendingSyncs = new HashMap<Long,List<FollowerSyncRequest>>();
- //Map sync request to FollowerHandler
- public HashMap<Long,FollowerHandler> syncHandler = new HashMap<Long,FollowerHandler>();
-
/**
* Adds follower to the leader.
*
@@ -253,7 +252,7 @@
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
- LOG.error("NEWLEADER proposal has Zxid of "
+ LOG.warn("NEWLEADER proposal has Zxid of "
+ newLeaderProposal.packet.getZxid());
}
outstandingProposals.add(newLeaderProposal);
@@ -373,17 +372,29 @@
*/
synchronized public void processAck(long zxid, SocketAddress followerAddr) {
boolean first = true;
- /*
- * LOG.error("Ack zxid: " + Long.toHexString(zxid)); for (Proposal
- * p : outstandingProposals) { long packetZxid = p.packet.getZxid();
- * LOG.error("outstanding proposal: " +
- * Long.toHexString(packetZxid)); } LOG.error("outstanding
- * proposals all");
- */
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ack zxid: 0x" + Long.toHexString(zxid));
+ for (Proposal p : outstandingProposals) {
+ long packetZxid = p.packet.getZxid();
+ LOG.debug("outstanding proposal: 0x"
+ + Long.toHexString(packetZxid));
+ }
+ LOG.debug("outstanding proposals all");
+ }
+
if (outstandingProposals.size() == 0) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("outstanding is 0");
+ }
return;
}
if (outstandingProposals.peek().packet.getZxid() > zxid) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("proposal has already been committed, pzxid:"
+ + outstandingProposals.peek().packet.getZxid()
+ + " zxid:" + zxid);
+ }
// The proposal has already been committed
return;
}
@@ -391,13 +402,16 @@
long packetZxid = p.packet.getZxid();
if (packetZxid == zxid) {
p.ackCount++;
- // LOG.error("FIXMSG",new RuntimeException(), "Count for " +
- // Long.toHexString(zxid) + " is " + p.ackCount);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Count for zxid: 0x" + Long.toHexString(zxid)
+ + " is " + p.ackCount);
+ }
+
if (p.ackCount > self.quorumPeers.size() / 2){
if (!first) {
- LOG.error("Commiting zxid 0x" + Long.toHexString(zxid)
+ LOG.fatal("Commiting zxid 0x" + Long.toHexString(zxid)
+ " from " + followerAddr + " not first!");
- LOG.error("First is "
+ LOG.fatal("First is "
+ outstandingProposals.element().packet);
System.exit(13);
}
@@ -408,23 +422,23 @@
// We don't commit the new leader proposal
if ((zxid & 0xffffffffL) != 0) {
if (p.request == null) {
- LOG.error("Going to commmit null: " + p);
+ LOG.warn("Going to commmit null: " + p);
}
commit(zxid);
zk.commitProcessor.commit(p.request);
if(pendingSyncs.containsKey(zxid)){
- sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), pendingSyncs.get(zxid));
- syncHandler.remove(pendingSyncs.get(zxid));
- pendingSyncs.remove(zxid);
+ for(FollowerSyncRequest r: pendingSyncs.remove(zxid)) {
+ sendSync(r);
}
}
}
+ }
return;
} else {
first = false;
}
}
- LOG.error("Trying to commit future proposal: zxid 0x"
+ LOG.warn("Trying to commit future proposal: zxid 0x"
+ Long.toHexString(zxid) + " from " + followerAddr);
}
@@ -528,8 +542,7 @@
}
baos.close();
} catch (IOException e) {
- // This really should be impossible
- LOG.error("FIXMSG",e);
+ LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos
.toByteArray(), null);
@@ -538,6 +551,10 @@
p.packet = pp;
p.request = request;
synchronized (this) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Proposing:: " + request);
+ }
+
outstandingProposals.add(p);
lastProposed = p.packet.getZxid();
sendPacket(pp);
@@ -551,44 +568,29 @@
* @param r the request
*/
- public void processSync(Request r){
+ synchronized public void processSync(FollowerSyncRequest r){
if(outstandingProposals.isEmpty()){
- LOG.warn("No outstanding proposal");
- sendSync(syncHandler.get(r.sessionId), r);
- syncHandler.remove(r.sessionId);
- }
- else{
- pendingSyncs.put(lastProposed, r);
+ sendSync(r);
+ } else {
+ List<FollowerSyncRequest> l = pendingSyncs.get(lastProposed);
+ if (l == null) {
+ l = new ArrayList<FollowerSyncRequest>();
+ }
+ l.add(r);
+ pendingSyncs.put(lastProposed, l);
}
}
/**
- * Set FollowerHandler for sync.
- *
- * @param f
- * @param s
- */
-
- synchronized public void setSyncHandler(FollowerHandler f, long s){
- syncHandler.put(s, f);
- }
-
- /**
* Sends a sync message to the appropriate server
*
* @param f
* @param r
*/
- public void sendSync(FollowerHandler f, Request r){
- if(f != null){
- QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
- f.queuePacket(qp);
- }
- else{
- LOG.warn("Committing sync: " + r.cxid );
- zk.commitProcessor.commit(r);
- }
+ public void sendSync(FollowerSyncRequest r){
+ QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
+ r.fh.queuePacket(qp);
}
/**
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Fri Oct 3 06:06:39 2008
@@ -59,7 +59,8 @@
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied);
- commitProcessor = new CommitProcessor(toBeAppliedProcessor);
+ commitProcessor = new CommitProcessor(toBeAppliedProcessor,
+ Integer.toString(getClientPort()), false);
RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Fri Oct 3 06:06:39 2008
@@ -56,17 +56,10 @@
* call processRequest on the next processor.
*/
- if(request.type == ZooDefs.OpCode.sync){
- zks.getLeader().processSync(request);
-
- if(!zks.getLeader().syncHandler.containsKey(request.sessionId)){
- zks.getLeader().syncHandler.put(request.sessionId, null);
+ if(request instanceof FollowerSyncRequest){
+ zks.getLeader().processSync((FollowerSyncRequest)request);
+ } else {
nextProcessor.processRequest(request);
- }
-
- }
- else{
- nextProcessor.processRequest(request);
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
zks.getLeader().propose(request);
Modified: hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ hadoop/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Oct 3 06:06:39 2008
@@ -384,6 +384,8 @@
@Override
public void run() {
+ setName("QuorumPeer:" + cnxnFactory.getLocalAddress());
+
/*
* Main loop
*/
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOps.java Fri Oct 3 06:06:39 2008
@@ -123,7 +123,7 @@
fail("unexpected interrupt");
}
// on the lookout for timeout
- assertSame(latch.getCount(), 0L);
+ assertSame(0L, latch.getCount());
String actual = toString();
@@ -410,11 +410,15 @@
verify();
}
+ public void setData() {
+ zk.setData(path, data, version, this, toString());
+ }
+
public void verifySetData() {
stat.setVersion(1);
new StringCB(zk).verifyCreate();
- zk.setData(path, data, version, this, toString());
+ setData();
verify();
}
@@ -460,10 +464,14 @@
super(zk, latch);
}
+ public void delete() {
+ zk.delete(path, version, this, toString());
+ }
+
public void verifyDelete() {
new StringCB(zk).verifyCreate();
- zk.delete(path, version, this, toString());
+ delete();
verify();
}
@@ -473,8 +481,12 @@
verify();
}
- public void verifySync() {
+ public void sync() {
zk.sync(path, this, toString());
+ }
+
+ public void verifySync() {
+ sync();
verify();
}
Modified: hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java?rev=701369&r1=701368&r2=701369&view=diff
==============================================================================
--- hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java (original)
+++ hadoop/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/AsyncOpsTest.java Fri Oct 3 06:06:39 2008
@@ -133,12 +133,12 @@
@Test
public void testAsyncExists() {
- new StatCB(zk).verifySetData();
+ new StatCB(zk).verifyExists();
}
@Test
public void testAsyncExistsFailure_NoNode() {
- new StatCB(zk).verifySetData();
+ new StatCB(zk).verifyExistsFailure_NoNode();
}
@Test