You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2011/09/14 08:56:14 UTC
svn commit: r1170452 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
Author: mahadev
Date: Wed Sep 14 06:56:13 2011
New Revision: 1170452
URL: http://svn.apache.org/viewvc?rev=1170452&view=rev
Log:
ZOOKEEPER-1136. NEW_LEADER should be queued not sent to match the Zab 1.0 protocol on the twiki (breed via mahadev)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Sep 14 06:56:13 2011
@@ -322,6 +322,9 @@ BUGFIXES:
ZOOKEEPER-961. Watch recovery after disconnection when connection string contains a prefix.
(Matthias Spycher via mahadev)
+ ZOOKEEPER-1136. NEW_LEADER should be queued not sent to match the Zab 1.0 protocol
+ on the twiki (breed via mahadev)
+
IMPROVEMENTS:
ZOOKEEPER-724. Improve junit test integration - log harness information
(phunt via mahadev)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferInputStream.java Wed Sep 14 06:56:13 2011
@@ -22,6 +22,9 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.Record;
+
public class ByteBufferInputStream extends InputStream {
ByteBuffer bb;
@@ -69,4 +72,11 @@ public class ByteBufferInputStream exten
return n;
}
+ static public void byteBuffer2Record(ByteBuffer bb, Record record)
+ throws IOException {
+ BinaryInputArchive ia;
+ ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+ record.deserialize(ia, "request");
+ }
+
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java?rev=1170452&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ByteBufferOutputStream.java Wed Sep 14 06:56:13 2011
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+
+public class ByteBufferOutputStream extends OutputStream {
+ ByteBuffer bb;
+ public ByteBufferOutputStream(ByteBuffer bb) {
+ this.bb = bb;
+ }
+ @Override
+ public void write(int b) throws IOException {
+ bb.put((byte)b);
+ }
+ @Override
+ public void write(byte[] b) throws IOException {
+ bb.put(b);
+ }
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ bb.put(b, off, len);
+ }
+ static public void record2ByteBuffer(Record record, ByteBuffer bb)
+ throws IOException {
+ BinaryOutputArchive oa;
+ oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
+ record.serialize(oa, "request");
+ }
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/DataTree.java Wed Sep 14 06:56:13 2011
@@ -880,7 +880,7 @@ public class DataTree {
}
assert(record != null);
- ZooKeeperServer.byteBuffer2Record(bb, record);
+ ByteBufferInputStream.byteBuffer2Record(bb, record);
if (failed && subtxn.getType() != OpCode.error){
int ec = post_failed ? Code.RUNTIMEINCONSISTENCY.intValue()
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Sep 14 06:56:13 2011
@@ -255,7 +255,7 @@ public class FinalRequestProcessor imple
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
break;
@@ -270,7 +270,7 @@ public class FinalRequestProcessor imple
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
@@ -284,7 +284,7 @@ public class FinalRequestProcessor imple
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
@@ -308,7 +308,7 @@ public class FinalRequestProcessor imple
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
- ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
+ ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
@@ -319,7 +319,7 @@ public class FinalRequestProcessor imple
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
getACLRequest);
Stat stat = new Stat();
List<ACL> acl =
@@ -330,7 +330,7 @@ public class FinalRequestProcessor imple
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
getChildrenRequest);
DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
if (n == null) {
@@ -353,7 +353,7 @@ public class FinalRequestProcessor imple
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
- ZooKeeperServer.byteBuffer2Record(request.request,
+ ByteBufferInputStream.byteBuffer2Record(request.request,
getChildren2Request);
Stat stat = new Stat();
DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Sep 14 06:56:13 2011
@@ -488,32 +488,32 @@ public class PrepRequestProcessor extend
switch (request.type) {
case OpCode.create:
CreateRequest createRequest = new CreateRequest();
- ZooKeeperServer.byteBuffer2Record(request.request, createRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest);
break;
case OpCode.delete:
DeleteRequest deleteRequest = new DeleteRequest();
- ZooKeeperServer.byteBuffer2Record(request.request, deleteRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
break;
case OpCode.setData:
SetDataRequest setDataRequest = new SetDataRequest();
- ZooKeeperServer.byteBuffer2Record(request.request, setDataRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
- ZooKeeperServer.byteBuffer2Record(request.request, setAclRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
break;
case OpCode.check:
CheckVersionRequest checkRequest = new CheckVersionRequest();
- ZooKeeperServer.byteBuffer2Record(request.request, checkRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, checkRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
break;
case OpCode.multi:
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
- ZooKeeperServer.byteBuffer2Record(request.request, multiRequest);
+ ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
List<Txn> txns = new ArrayList<Txn>();
//Each op in a multi-op must have the same zxid!
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Sep 14 06:56:13 2011
@@ -661,13 +661,6 @@ public class ZooKeeperServer implements
}
}
- static public void byteBuffer2Record(ByteBuffer bb, Record record)
- throws IOException {
- BinaryInputArchive ia;
- ia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
- record.deserialize(ia, "request");
- }
-
public static int getSnapCount() {
String sc = System.getProperty("zookeeper.snapCount");
try {
@@ -860,7 +853,7 @@ public class ZooKeeperServer implements
if (h.getType() == OpCode.auth) {
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
- ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
@@ -917,7 +910,7 @@ public class ZooKeeperServer implements
private Record processSasl(ByteBuffer incomingBuffer, ServerCnxn cnxn) throws IOException {
LOG.debug("Responding to client SASL token.");
GetSASLRequest clientTokenRecord = new GetSASLRequest();
- byteBuffer2Record(incomingBuffer,clientTokenRecord);
+ ByteBufferInputStream.byteBuffer2Record(incomingBuffer,clientTokenRecord);
byte[] clientToken = clientTokenRecord.getToken();
LOG.debug("Size of client SASL token: " + clientToken.length);
byte[] responseToken = null;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Wed Sep 14 06:56:13 2011
@@ -281,7 +281,7 @@ public class Leader {
long epoch = -1;
boolean waitingForNewEpoch = true;
- boolean readyToStart = false;
+ volatile boolean readyToStart = false;
/**
* This method is main function that is called to lead
@@ -309,13 +309,17 @@ public class Leader {
cnxAcceptor = new LearnerCnxAcceptor();
cnxAcceptor.start();
+ readyToStart = true;
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());
self.setAcceptedEpoch(epoch);
+
zk.setZxid(ZxidUtils.makeZxid(epoch, 0));
+ /*
synchronized(this){
lastProposed = zk.getZxid();
}
+ */
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
null, null);
@@ -328,7 +332,6 @@ public class Leader {
outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
newLeaderProposal.ackSet.add(self.getId());
- readyToStart = true;
waitForEpochAck(self.getId(), leaderStateSummary);
self.setCurrentEpoch(epoch);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Sep 14 06:56:13 2011
@@ -290,18 +290,17 @@ public class Learner {
}
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);
writePacket(ackNewEpoch, true);
- readPacket(qp);
+ return ZxidUtils.makeZxid(newEpoch, 0);
} else {
if (newEpoch > self.getAcceptedEpoch()) {
self.setAcceptedEpoch(newEpoch);
}
+ if (qp.getType() != Leader.NEWLEADER) {
+ LOG.error("First packet should have been NEWLEADER");
+ throw new IOException("First packet should have been NEWLEADER");
+ }
+ return qp.getZxid();
}
- if (qp.getType() != Leader.NEWLEADER) {
- LOG.error("First packet should have been NEWLEADER");
- throw new IOException("First packet should have been NEWLEADER");
- }
-
- return qp.getZxid();
}
/**
@@ -353,6 +352,11 @@ public class Learner {
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
long lastQueued = 0;
+
+ // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
+ // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
+ // we need to make sure that we don't take the snapshot twice.
+ boolean snapshotTaken = false;
// we are now going to start getting transactions to apply followed by an UPTODATE
outerLoop:
while (self.isRunning()) {
@@ -362,7 +366,7 @@ public class Learner {
PacketInFlight pif = new PacketInFlight();
pif.hdr = new TxnHeader();
pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);
- if (pif.hdr. getZxid() != lastQueued + 1) {
+ if (pif.hdr.getZxid() != lastQueued + 1) {
LOG.warn("Got zxid 0x"
+ Long.toHexString(pif.hdr.getZxid())
+ " expected 0x"
@@ -386,9 +390,16 @@ public class Learner {
zk.getZKDatabase().processTxn(hdr, txn);
break;
case Leader.UPTODATE:
- zk.takeSnapshot();
+ if (!snapshotTaken) {
+ zk.takeSnapshot();
+ }
self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
+ case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+ zk.takeSnapshot();
+ snapshotTaken = true;
+ writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
+ break;
}
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1170452&r1=1170451&r2=1170452&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Wed Sep 14 06:56:13 2011
@@ -39,8 +39,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
-import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
@@ -233,6 +233,7 @@ public class LearnerHandler extends Thre
@Override
public void run() {
try {
+ sock.setSoTimeout(leader.self.getTickTime()*leader.self.getInitLimit());
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock
.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
@@ -252,7 +253,7 @@ public class LearnerHandler extends Thre
this.sid = bbsid.getLong();
} else {
LearnerInfo li = new LearnerInfo();
- ZooKeeperServer.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
+ ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
this.sid = li.getServerid();
this.version = li.getProtocolVersion();
}
@@ -271,37 +272,33 @@ public class LearnerHandler extends Thre
long peerLastZxid;
StateSummary ss = null;
- if (learnerType == LearnerType.PARTICIPANT) {
- long zxid = qp.getZxid();
- long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
-
- if (this.getVersion() < 0x10000) {
- // we are going to have to extrapolate the epoch information
- long epoch = ZxidUtils.getEpochFromZxid(zxid);
- ss = new StateSummary(epoch, zxid);
- // fake the message
- leader.waitForEpochAck(this.getSid(), ss);
- } else {
- byte ver[] = new byte[4];
- ByteBuffer.wrap(ver).putInt(0x10000);
- QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
- oa.writeRecord(newEpochPacket, "packet");
- bufferedOutput.flush();
- QuorumPacket ackEpochPacket = new QuorumPacket();
- ia.readRecord(ackEpochPacket, "packet");
- if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
- LOG.error(ackEpochPacket.toString()
- + " is not ACKEPOCH");
- return;
- }
- ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
- ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
- leader.waitForEpochAck(this.getSid(), ss);
- }
- peerLastZxid = ss.getLastZxid();
+ long zxid = qp.getZxid();
+ long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+
+ if (this.getVersion() < 0x10000) {
+ // we are going to have to extrapolate the epoch information
+ long epoch = ZxidUtils.getEpochFromZxid(zxid);
+ ss = new StateSummary(epoch, zxid);
+ // fake the message
+ leader.waitForEpochAck(this.getSid(), ss);
} else {
- peerLastZxid = qp.getZxid();
+ byte ver[] = new byte[4];
+ ByteBuffer.wrap(ver).putInt(0x10000);
+ QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
+ oa.writeRecord(newEpochPacket, "packet");
+ bufferedOutput.flush();
+ QuorumPacket ackEpochPacket = new QuorumPacket();
+ ia.readRecord(ackEpochPacket, "packet");
+ if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
+ LOG.error(ackEpochPacket.toString()
+ + " is not ACKEPOCH");
+ return;
+ }
+ ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
+ ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
+ leader.waitForEpochAck(this.getSid(), ss);
}
+ peerLastZxid = ss.getLastZxid();
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
@@ -390,9 +387,13 @@ public class LearnerHandler extends Thre
rl.unlock();
}
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- leaderLastZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ ZxidUtils.makeZxid(newEpoch, 0), null, null);
+ if (getVersion() < 0x10000) {
+ oa.writeRecord(newLeaderQP, "packet");
+ } else {
+ queuedPackets.add(newLeaderQP);
+ }
bufferedOutput.flush();
//Need to set the zxidToSend to the latest zxid
if (packetToSend == Leader.SNAP) {
@@ -415,13 +416,6 @@ public class LearnerHandler extends Thre
}
bufferedOutput.flush();
- // Mutation packets will be queued during the serialize,
- // so we need to mark when the peer can actually start
- // using the data
- //
- queuedPackets
- .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
-
// Start sending packets
new Thread() {
public void run() {
@@ -456,6 +450,12 @@ public class LearnerHandler extends Thre
leader.zk.wait(20);
}
}
+ // Mutation packets will be queued during the serialize,
+ // so we need to mark when the peer can actually start
+ // using the data
+ //
+ queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
+
while (true) {
qp = new QuorumPacket();
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1170452&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Wed Sep 14 06:56:13 2011
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+
+import org.apache.jute.BinaryInputArchive;
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
+import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.ZKUtil;
+import org.apache.zookeeper.server.ByteBufferOutputStream;
+import org.apache.zookeeper.server.DataTree;
+import org.apache.zookeeper.server.ServerCnxn;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class Zab1_0Test {
+ private static final class LeadThread extends Thread {
+ private final Leader leader;
+
+ private LeadThread(Leader leader) {
+ this.leader = leader;
+ }
+
+ public void run() {
+ try {
+ leader.lead();
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ leader.shutdown("lead ended");
+ }
+ }
+ }
+ private static final class NullServerCnxnFactory extends ServerCnxnFactory {
+ public void startup(ZooKeeperServer zkServer) throws IOException,
+ InterruptedException {
+ }
+ public void start() {
+ }
+ public void shutdown() {
+ }
+ public void setMaxClientCnxnsPerHost(int max) {
+ }
+ public void join() throws InterruptedException {
+ }
+ public int getMaxClientCnxnsPerHost() {
+ return 0;
+ }
+ public int getLocalPort() {
+ return 0;
+ }
+ public InetSocketAddress getLocalAddress() {
+ return null;
+ }
+ public Iterable<ServerCnxn> getConnections() {
+ return null;
+ }
+ public void configure(InetSocketAddress addr, int maxClientCnxns)
+ throws IOException {
+ }
+ public void closeSession(long sessionId) {
+ }
+ public void closeAll() {
+ }
+ }
+ static class MockDataTreeBuilder implements DataTreeBuilder {
+ @Override
+ public DataTree build() {
+ return new DataTree();
+ }
+
+ }
+ static Socket[] getSocketPair() throws IOException {
+ ServerSocket ss = new ServerSocket();
+ ss.bind(null);
+ InetSocketAddress endPoint = (InetSocketAddress) ss.getLocalSocketAddress();
+ Socket s = new Socket(endPoint.getAddress(), endPoint.getPort());
+ return new Socket[] { s, ss.accept() };
+ }
+ static void readPacketSkippingPing(InputArchive ia, QuorumPacket qp) throws IOException {
+ while(true) {
+ ia.readRecord(qp, null);
+ if (qp.getType() != Leader.PING) {
+ return;
+ }
+ }
+ }
+
+ static public interface LeaderConversation {
+ void converseWithLeader(InputArchive ia, OutputArchive oa) throws Exception;
+ }
+
+ static public interface FollowerConversation {
+ void converseWithFollower(InputArchive ia, OutputArchive oa) throws Exception;
+ }
+
+ public void testConversation(LeaderConversation conversation) throws Exception {
+ Socket pair[] = getSocketPair();
+ Socket leaderSocket = pair[0];
+ Socket followerSocket = pair[1];
+ File tmpDir = File.createTempFile("test", "dir");
+ tmpDir.delete();
+ tmpDir.mkdir();
+ LeadThread leadThread = null;
+ Leader leader = null;
+ try {
+ QuorumPeer peer = createQuorumPeer(tmpDir);
+ leader = createLeader(tmpDir, peer);
+ peer.leader = leader;
+ leadThread = new LeadThread(leader);
+ leadThread.start();
+
+ while(!leader.readyToStart) {
+ Thread.sleep(20);
+ }
+
+ LearnerHandler lh = new LearnerHandler(leaderSocket, leader);
+ lh.start();
+ leaderSocket.setSoTimeout(4000);
+
+ InputArchive ia = BinaryInputArchive.getArchive(followerSocket
+ .getInputStream());
+ OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket
+ .getOutputStream());
+
+ conversation.converseWithLeader(ia, oa);
+ } finally {
+ recursiveDelete(tmpDir);
+ if (leader != null) {
+ leader.shutdown("end of test");
+ }
+ if (leadThread != null) {
+ leadThread.interrupt();
+ leadThread.join();
+ }
+ }
+ }
+
+ @Test
+ public void testNormalRun() throws Exception {
+ testConversation(new LeaderConversation() {
+ public void converseWithLeader(InputArchive ia, OutputArchive oa)
+ throws IOException {
+ /* we test a normal run. everything should work out well. */
+ LearnerInfo li = new LearnerInfo(1, 0x10000);
+ byte liBytes[] = new byte[12];
+ ByteBufferOutputStream.record2ByteBuffer(li,
+ ByteBuffer.wrap(liBytes));
+ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
+ liBytes, null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+ 0x10000);
+ qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.DIFF, qp.getType());
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+ qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.UPTODATE, qp.getType());
+ }
+ });
+ }
+
+ @Test
+ public void testLeaderBehind() throws Exception {
+ testConversation(new LeaderConversation() {
+ public void converseWithLeader(InputArchive ia, OutputArchive oa)
+ throws IOException {
+ /* we test a normal run. everything should work out well. */
+ LearnerInfo li = new LearnerInfo(1, 0x10000);
+ byte liBytes[] = new byte[12];
+ ByteBufferOutputStream.record2ByteBuffer(li,
+ ByteBuffer.wrap(liBytes));
+ /* we are going to say we last acked epoch 20 */
+ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, ZxidUtils.makeZxid(20, 0),
+ liBytes, null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
+ Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+ 0x10000);
+ qp = new QuorumPacket(Leader.ACKEPOCH, 0, new byte[4], null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.DIFF, qp.getType());
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.NEWLEADER, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(21, 0), qp.getZxid());
+ qp = new QuorumPacket(Leader.ACK, qp.getZxid(), null, null);
+ oa.writeRecord(qp, null);
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.UPTODATE, qp.getType());
+ }
+ });
+ }
+
+
+ private void recursiveDelete(File file) {
+ if (file.isFile()) {
+ file.delete();
+ } else {
+ for(File c: file.listFiles()) {
+ recursiveDelete(c);
+ }
+ file.delete();
+ }
+ }
+
+ private Leader createLeader(File tmpDir, QuorumPeer peer)
+ throws IOException, NoSuchFieldException, IllegalAccessException {
+ FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
+ peer.setTxnFactory(logFactory);
+ Field addrField = peer.getClass().getDeclaredField("myQuorumAddr");
+ addrField.setAccessible(true);
+ addrField.set(peer, new InetSocketAddress(33556));
+ ZKDatabase zkDb = new ZKDatabase(logFactory);
+ DataTreeBuilder treeBuilder = new MockDataTreeBuilder();
+ LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, treeBuilder, zkDb);
+ return new Leader(peer, zk);
+ }
+ private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
+ FileNotFoundException {
+ QuorumPeer peer = new QuorumPeer();
+ peer.syncLimit = 2;
+ peer.initLimit = 2;
+ peer.tickTime = 2000;
+ peer.quorumPeers = new HashMap<Long, QuorumServer>();
+ peer.quorumPeers.put(1L, new QuorumServer(0, new InetSocketAddress(33221)));
+ peer.quorumPeers.put(1L, new QuorumServer(1, new InetSocketAddress(33223)));
+ peer.setQuorumVerifier(new QuorumMaj(3));
+ peer.setCnxnFactory(new NullServerCnxnFactory());
+ File version2 = new File(tmpDir, "version-2");
+ version2.mkdir();
+ new FileOutputStream(new File(version2, "currentEpoch")).write("0\n".getBytes());
+ new FileOutputStream(new File(version2, "acceptedEpoch")).write("0\n".getBytes());
+ return peer;
+ }
+}