You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2011/01/23 06:29:41 UTC
svn commit: r1062327 - in /zookeeper/branches/branch-3.3: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/persistence/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: breed
Date: Sun Jan 23 05:29:40 2011
New Revision: 1062327
URL: http://svn.apache.org/viewvc?rev=1062327&view=rev
Log:
ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF
ZOOKEEPER-882. Startup loads last transaction from snapshot
Added:
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
Modified:
zookeeper/branches/branch-3.3/CHANGES.txt
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
Modified: zookeeper/branches/branch-3.3/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/CHANGES.txt?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.3/CHANGES.txt Sun Jan 23 05:29:40 2011
@@ -9,6 +9,10 @@ BUGFIXES:
ZOOKEEPER-921. zkPython incorrectly checks for existence of required
ACL elements (Nicholas Knight via henryr)
+ ZOOKEEPER-882. Startup loads last transaction from snapshot (Jared Cantwell via breed)
+
+ ZOOKEEPER-962. leader/follower coherence issue when follower is receiving a DIFF (Camille Fournier via breed)
+
IMPROVEMENTS:
ZOOKEEPER-963. Make Forrest work with JDK6 (Carl Steinbach via henryr)
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Sun Jan 23 05:29:40 2011
@@ -26,6 +26,9 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
@@ -33,16 +36,16 @@ import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
import org.apache.zookeeper.server.quorum.Leader;
-import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
@@ -67,6 +70,7 @@ public class ZKDatabase {
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
+ protected ReentrantReadWriteLock logLock = new ReentrantReadWriteLock();
volatile private boolean initialized = false;
/**
@@ -104,8 +108,12 @@ public class ZKDatabase {
*/
dataTree = new DataTree();
sessionsWithTimeouts.clear();
- synchronized (committedLog) {
+ WriteLock lock = logLock.writeLock();
+ try {
+ lock.lock();
committedLog.clear();
+ } finally {
+ lock.unlock();
}
initialized = false;
}
@@ -136,13 +144,30 @@ public class ZKDatabase {
public long getminCommittedLog() {
return minCommittedLog;
}
-
- public LinkedList<Proposal> getCommittedLog() {
- synchronized (this.committedLog) {
- return new LinkedList<Proposal>(this.committedLog);
- }
+ /**
+ * Get the lock that controls the committedLog. If you want to get the pointer to the committedLog, you need
+ * to use this lock to acquire a read lock before calling getCommittedLog()
+ * @return the lock that controls the committed log
+ */
+ public ReentrantReadWriteLock getLogLock() {
+ return logLock;
}
+
+ public synchronized LinkedList<Proposal> getCommittedLog() {
+ ReadLock rl = logLock.readLock();
+ // only make a copy if this thread isn't already holding a lock
+ if(logLock.getReadHoldCount() <=0) {
+ try {
+ rl.lock();
+ return new LinkedList<Proposal>(this.committedLog);
+ } finally {
+ rl.unlock();
+ }
+ }
+ return this.committedLog;
+ }
+
/**
* get the last processed zxid from a datatree
* @return the last processed zxid of a datatree
@@ -206,7 +231,9 @@ public class ZKDatabase {
* @param request committed request
*/
public void addCommittedProposal(Request request) {
- synchronized (committedLog) {
+ WriteLock wl = logLock.writeLock();
+ try {
+ wl.lock();
if (committedLog.size() > commitLogCount) {
committedLog.removeFirst();
minCommittedLog = committedLog.getFirst().packet.getZxid();
@@ -234,6 +261,8 @@ public class ZKDatabase {
p.request = request;
committedLog.add(p);
maxCommittedLog = p.packet.getZxid();
+ } finally {
+ wl.unlock();
}
}
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Sun Jan 23 05:29:40 2011
@@ -570,11 +570,14 @@ public class FileTxnLog implements TxnLo
inputStream.close();
inputStream = null;
ia = null;
+ hdr = null;
// thsi means that the file has ended
// we shoud go to the next file
if (!goToNextLog()) {
return false;
}
+ // if we went to the next log file, we should call next() again
+ return next();
}
return true;
}
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Sun Jan 23 05:29:40 2011
@@ -123,7 +123,7 @@ public class FileTxnSnapLog {
PlayBackListener listener) throws IOException {
snapLog.deserialize(dt, sessions);
FileTxnLog txnLog = new FileTxnLog(dataDir);
- TxnIterator itr = txnLog.read(dt.lastProcessedZxid);
+ TxnIterator itr = txnLog.read(dt.lastProcessedZxid+1);
long highestZxid = dt.lastProcessedZxid;
TxnHeader hdr;
while (true) {
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Sun Jan 23 05:29:40 2011
@@ -46,6 +46,7 @@ public class CommitProcessor extends Thr
LinkedList<Request> committedRequests = new LinkedList<Request>();
RequestProcessor nextProcessor;
+ ArrayList<Request> toProcess = new ArrayList<Request>();
/**
* This flag indicates whether we need to wait for a response to come back from the
@@ -65,8 +66,7 @@ public class CommitProcessor extends Thr
@Override
public void run() {
try {
- Request nextPending = null;
- ArrayList<Request> toProcess = new ArrayList<Request>();
+ Request nextPending = null;
while (!finished) {
int len = toProcess.size();
for (int i = 0; i < len; i++) {
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Sun Jan 23 05:29:40 2011
@@ -125,8 +125,7 @@ public class Follower extends Learner{
fzk.commit(qp.getZxid());
break;
case Leader.UPTODATE:
- fzk.takeSnapshot();
- self.cnxnFactory.setZooKeeperServer(fzk);
+ LOG.error("Received an UPTODATE message after Follower started");
break;
case Leader.REVALIDATE:
revalidate(qp);
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Sun Jan 23 05:29:40 2011
@@ -28,6 +28,7 @@ import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -35,11 +36,14 @@ import org.apache.jute.BinaryInputArchiv
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
+import org.apache.jute.Record;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.TxnHeader;
/**
* This class is the superclass of two of the three main actors in a ZK
@@ -47,6 +51,10 @@ import org.apache.zookeeper.server.quoru
* a good deal of code which is moved into Peer to avoid duplication.
*/
public class Learner {
+ static class PacketInFlight {
+ TxnHeader hdr;
+ Record rec;
+ }
QuorumPeer self;
LearnerZooKeeperServer zk;
@@ -276,7 +284,8 @@ public class Learner {
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
- readPacket(qp);
+ readPacket(qp);
+ LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
synchronized (zk) {
if (qp.getType() == Leader.DIFF) {
LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));
@@ -290,7 +299,7 @@ public class Learner {
String signature = leaderIs.readString("signature");
if (!signature.equals("BenWasHere")) {
LOG.error("Missing signature. Got " + signature);
- throw new IOException("Missing signature");
+ throw new IOException("Missing signature");
}
} else if (qp.getType() == Leader.TRUNC) {
//we need to truncate the log to the lastzxid of the leader
@@ -311,15 +320,63 @@ public class Learner {
System.exit(13);
}
+ zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
if(LOG.isInfoEnabled()){
LOG.info("Setting leader epoch " + Long.toHexString(newLeaderZxid >> 32L));
}
- zk.getZKDatabase().setlastProcessedZxid(newLeaderZxid);
+
+ long lastQueued = 0;
+ // we are now going to start getting transactions to apply followed by an UPTODATE
+ outerLoop:
+ while (self.isRunning()) {
+ readPacket(qp);
+ switch(qp.getType()) {
+ case Leader.PROPOSAL:
+ PacketInFlight pif = new PacketInFlight();
+ pif.hdr = new TxnHeader();
+ BinaryInputArchive ia = BinaryInputArchive
+ .getArchive(new ByteArrayInputStream(qp.getData()));
+ pif.rec = SerializeUtils.deserializeTxn(ia, pif.hdr);
+ if (pif.hdr. getZxid() != lastQueued + 1) {
+ LOG.warn("Got zxid 0x"
+ + Long.toHexString(pif.hdr.getZxid())
+ + " expected 0x"
+ + Long.toHexString(lastQueued + 1));
+ }
+ lastQueued = pif.hdr.getZxid();
+ packetsNotCommitted.add(pif);
+ break;
+ case Leader.COMMIT:
+ pif = packetsNotCommitted.peekFirst();
+ if (pif.hdr.getZxid() != qp.getZxid()) {
+ LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
+ } else {
+ zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+ packetsNotCommitted.remove();
+ }
+ break;
+ case Leader.INFORM:
+ TxnHeader hdr = new TxnHeader();
+ ia = BinaryInputArchive
+ .getArchive(new ByteArrayInputStream(qp.getData()));
+ Record txn = SerializeUtils.deserializeTxn(ia, hdr);
+ zk.getZKDatabase().processTxn(hdr, txn);
+ break;
+ case Leader.UPTODATE:
+ zk.takeSnapshot();
+ self.cnxnFactory.setZooKeeperServer(zk);
+ break outerLoop;
+ }
+ }
}
ack.setZxid(newLeaderZxid & ~0xffffffffL);
writePacket(ack, true);
sock.setSoTimeout(self.tickTime * self.syncLimit);
zk.startup();
+ //We have to have a commit processor to do this
+ for(PacketInFlight p: packetsNotCommitted) {
+ ((FollowerZooKeeperServer)zk).logRequest(p.hdr, p.rec);
+ }
}
protected void revalidate(QuorumPacket qp) throws IOException {
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Jan 23 05:29:40 2011
@@ -29,6 +29,8 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
@@ -254,79 +256,72 @@ public class LearnerHandler extends Thre
long peerLastZxid = qp.getZxid();
/* the default to send to the follower */
int packetToSend = Leader.SNAP;
- boolean logTxns = true;
long zxidToSend = 0;
-
+ long leaderLastZxid = 0;
/** the packets that the follower needs to get updates from **/
long updates = peerLastZxid;
/* we are sending the diff check if we have proposals in memory to be able to
* send a diff to the
*/
- LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
- synchronized(proposals) {
+ ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
+ ReadLock rl = lock.readLock();
+ try {
+ rl.lock();
+ final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
+ final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
+ LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
if (proposals.size() != 0) {
- if ((leader.zk.getZKDatabase().getmaxCommittedLog() >= peerLastZxid)
- && (leader.zk.getZKDatabase().getminCommittedLog() <= peerLastZxid)) {
+ if ((maxCommittedLog >= peerLastZxid)
+ && (minCommittedLog <= peerLastZxid)) {
packetToSend = Leader.DIFF;
- zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
+ zxidToSend = maxCommittedLog;
for (Proposal propose: proposals) {
if (propose.packet.getZxid() > peerLastZxid) {
queuePacket(propose.packet);
QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
null, null);
queuePacket(qcommit);
-
}
}
+ } else if (peerLastZxid > maxCommittedLog) {
+ packetToSend = Leader.TRUNC;
+ zxidToSend = maxCommittedLog;
+ updates = zxidToSend;
}
+ } else {
+ // just let the state transfer happen
+ }
+
+ leaderLastZxid = leader.startForwarding(this, updates);
+ if (peerLastZxid == leaderLastZxid) {
+ // We are in sync so we'll do an empty diff
+ packetToSend = Leader.DIFF;
+ zxidToSend = leaderLastZxid;
}
- else {
- logTxns = false;
- }
- }
-
- //check if we decided to send a diff or we need to send a truncate
- // we avoid using epochs for truncating because epochs make things
- // complicated. Two epochs might have the last 32 bits as same.
- // only if we know that there is a committed zxid in the queue that
- // is less than the one the peer has we send a trunc else to make
- // things simple we just send sanpshot.
- if (logTxns && (peerLastZxid > leader.zk.getZKDatabase().getmaxCommittedLog())) {
- // this is the only case that we are sure that
- // we can ask the peer to truncate the log
- packetToSend = Leader.TRUNC;
- zxidToSend = leader.zk.getZKDatabase().getmaxCommittedLog();
- updates = zxidToSend;
- }
-
- /* see what other packets from the proposal
- * and tobeapplied queues need to be sent
- * and then decide if we can just send a DIFF
- * or we actually need to send the whole snapshot
- */
- long leaderLastZxid = leader.startForwarding(this, updates);
- // a special case when both the ids are the same
- if (peerLastZxid == leaderLastZxid) {
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
+ } finally {
+ rl.unlock();
}
QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
leaderLastZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
bufferedOutput.flush();
-
-
+ //Need to set the zxidToSend to the latest zxid
+ if (packetToSend == Leader.SNAP) {
+ zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+ }
oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
if (packetToSend == Leader.SNAP) {
- LOG.warn("Sending snapshot last zxid of peer is 0x"
+ LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
+ " zxid of leader is 0x"
- + Long.toHexString(leaderLastZxid));
+ + Long.toHexString(leaderLastZxid)
+ + "sent zxid of db as 0x"
+ + Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
@@ -524,6 +519,6 @@ public class LearnerHandler extends Thre
public boolean synced() {
return isAlive()
- && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
+ && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
}
}
Modified: zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/branches/branch-3.3/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Sun Jan 23 05:29:40 2011
@@ -111,8 +111,7 @@ public class Observer extends Learner{
LOG.warn("Ignoring commit");
break;
case Leader.UPTODATE:
- zk.takeSnapshot();
- self.cnxnFactory.setZooKeeperServer(zk);
+ LOG.error("Received an UPTODATE message after Observer started");
break;
case Leader.REVALIDATE:
revalidate(qp);
Added: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1062327&view=auto
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (added)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Sun Jan 23 05:29:40 2011
@@ -0,0 +1,406 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.Test;
+
+
+public class FollowerResyncConcurrencyTest extends QuorumBase {
+ volatile int counter = 0;
+ volatile int errors = 0;
+
+ private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
+ public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+
+ /**
+ * See ZOOKEEPER-962. This tests for one of the bugs hit while fixing this,
+ * setting the ZXID of the SNAP packet
+ * Starts up 3 ZKs. Shut down F1, write a node, restart the one that was shut down
+ * The non-leader ZKs are writing to cluster
+ * Shut down F1 again
+ * Restart after sessions are expired, expect to get a snap file
+ * Shut down, run some transactions through.
+ * Restart to a diff while transactions are running in leader
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ @Test
+ public void testResyncBySnapThenDiffAfterFollowerCrashes ()
+ throws IOException, InterruptedException, KeeperException, Throwable{
+ final Semaphore sem = new Semaphore(0);
+
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+ CountdownWatcher watcher1 = new CountdownWatcher();
+ CountdownWatcher watcher2 = new CountdownWatcher();
+ CountdownWatcher watcher3 = new CountdownWatcher();
+
+ int index = 1;
+ while(qu.getPeer(index).peer.leader == null)
+ index++;
+
+ Leader leader = qu.getPeer(index).peer.leader;
+
+ assertNotNull(leader);
+ /*
+ * Reusing the index variable to select a follower to connect to
+ */
+ index = (index == 1) ? 2 : 1;
+ qu.shutdown(index);
+ final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000,watcher3);
+ watcher3.waitForConnected(CONNECTION_TIMEOUT);
+ zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+ qu.restart(index);
+ ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
+
+ ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher2);
+
+ watcher1.waitForConnected(CONNECTION_TIMEOUT);
+ watcher2.waitForConnected(CONNECTION_TIMEOUT);
+
+ zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ for(int i = 0; i < 1000; i++) {
+ zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter == 14200){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+ if(i%10==0){
+ try {
+ Thread.sleep(100);
+ } catch (Exception e) {
+
+ }
+ }
+ }
+
+ }
+ });
+
+
+ for(int i = 0; i < 13000; i++) {
+ zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter == 14200){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+
+ if(i == 5000){
+ qu.shutdown(index);
+ LOG.info("Shutting down s1");
+ }
+ if(i == 12000){
+ //Restart off of snap, then get some txns for a log, then shut down
+ qu.restart(index);
+ Thread.sleep(300);
+ qu.shutdown(index);
+ t.start();
+ Thread.sleep(300);
+ qu.restart(index);
+ LOG.info("Setting up server: " + index);
+ }
+ if((i % 1000) == 0){
+ Thread.sleep(1000);
+ }
+
+ if(i%50 == 0) {
+ zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter == 14200){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+ }
+ }
+
+ // Wait until all updates return
+ if(!sem.tryAcquire(20000, TimeUnit.MILLISECONDS)) {
+ LOG.warn("Did not aquire semaphore fast enough");
+ }
+ t.join(10000);
+ Thread.sleep(1000);
+
+ verifyState(qu, index, leader);
+
+ }
+
+ /**
+ * This test:
+ * Starts up 3 ZKs. The non-leader ZKs are writing to cluster
+ * Shut down one of the non-leader ZKs.
+ * Restart after sessions have expired but <500 txns have taken place (get a diff)
+ * Shut down immediately after restarting, start running separate thread with other transactions
+ * Restart to a diff while transactions are running in leader
+ *
+ *
+ * Before fixes for ZOOKEEPER-962, restarting off of diff could get an inconsistent view of data missing transactions that
+ * completed during diff syncing. Follower would also be considered "restarted" before all forwarded transactions
+ * were completely processed, so restarting would cause a snap file with a too-high zxid to be written, and transactions
+ * would be missed
+ *
+ * This test should pretty reliably catch the failure of restarting the server before all diff messages have been processed,
+ * however, due to the transient nature of the system it may not catch failures due to concurrent processing of transactions
+ * during the leader's diff forwarding.
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ * @throws Throwable
+ */
+
+ @Test
+ public void testResyncByDiffAfterFollowerCrashes ()
+ throws IOException, InterruptedException, KeeperException, Throwable{
+ final Semaphore sem = new Semaphore(0);
+
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.startAll();
+ CountdownWatcher watcher1 = new CountdownWatcher();
+ CountdownWatcher watcher2 = new CountdownWatcher();
+ CountdownWatcher watcher3 = new CountdownWatcher();
+
+
+ int index = 1;
+ while(qu.getPeer(index).peer.leader == null)
+ index++;
+
+ Leader leader = qu.getPeer(index).peer.leader;
+
+ assertNotNull(leader);
+
+ /*
+ * Reusing the index variable to select a follower to connect to
+ */
+ index = (index == 1) ? 2 : 1;
+
+ ZooKeeper zk = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000, watcher1);
+
+ ZooKeeper zk2 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(index).peer.getClientPort(), 1000,watcher2);
+ final ZooKeeper zk3 = new DisconnectableZooKeeper("127.0.0.1:" + qu.getPeer(3).peer.getClientPort(), 1000, watcher3);
+ watcher1.waitForConnected(CONNECTION_TIMEOUT);
+ watcher2.waitForConnected(CONNECTION_TIMEOUT);
+ watcher3.waitForConnected(CONNECTION_TIMEOUT);
+ zk.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+
+
+ final AtomicBoolean runNow = new AtomicBoolean(false);
+ Thread t = new Thread(new Runnable() {
+
+ @Override
+ public void run() {
+ int inSyncCounter = 0;
+ while(inSyncCounter < 400) {
+ if(runNow.get()) {
+ zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter > 7300){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+
+ try {
+ Thread.sleep(10);
+ } catch (Exception e) {
+ }
+ inSyncCounter++;
+ }
+ else {
+ Thread.yield();
+ }
+ }
+
+ }
+ });
+
+ t.start();
+ for(int i = 0; i < 5000; i++) {
+ zk2.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter > 7300){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+
+ if(i == 1000){
+ qu.shutdown(index);
+ Thread.sleep(1100);
+ LOG.info("Shutting down s1");
+
+ }
+ if(i == 1100 || i == 1150 || i == 1200) {
+ Thread.sleep(1000);
+ }
+
+ if(i == 1200){
+ qu.startThenShutdown(index);
+ runNow.set(true);
+ qu.restart(index);
+ LOG.info("Setting up server: " + index);
+ }
+
+
+ if(i>=1000 && i%2== 0) {
+ zk3.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
+
+ @Override
+ public void processResult(int rc, String path, Object ctx, String name) {
+ counter++;
+ if (rc != 0) {
+ errors++;
+ }
+ if(counter > 7300){
+ sem.release();
+ }
+
+
+ }
+ }, null);
+ }
+ if(i == 1050 || i == 1100 || i == 1150) {
+ Thread.sleep(1000);
+ }
+ }
+
+ // Wait until all updates return
+ if(!sem.tryAcquire(15000, TimeUnit.MILLISECONDS)) {
+ LOG.warn("Did not aquire semaphore fast enough");
+ }
+ t.join(10000);
+ Thread.sleep(1000);
+ // Verify that server is following and has the same epoch as the leader
+
+ verifyState(qu, index, leader);
+
+ }
+
+ private void verifyState(QuorumUtil qu, int index, Leader leader) {
+ assertTrue("Not following", qu.getPeer(index).peer.follower != null);
+ long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
+ long epochL = (leader.getEpoch() >> 32L);
+ assertTrue("Zxid: " + qu.getPeer(index).peer.getActiveServer().getZKDatabase().getDataTreeLastProcessedZxid() +
+ "Current epoch: " + epochF, epochF == epochL);
+ int leaderIndex = (index == 1) ? 2 : 1;
+ Collection<Long> sessionsRestarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase().getSessions();
+ Collection<Long> sessionsNotRestarted = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase().getSessions();
+
+ for(Long l : sessionsRestarted) {
+ assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
+ }
+ assertEquals("Should have same number of sessions", sessionsNotRestarted.size(), sessionsRestarted.size());
+ ZKDatabase restarted = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
+ ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
+ ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
+ for(Long l : sessionsRestarted) {
+ assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
+ HashSet ephemerals = restarted.getEphemerals(l);
+ HashSet cleanEphemerals = clean.getEphemerals(l);
+ for(Object o : cleanEphemerals) {
+ if(!ephemerals.contains(o)) {
+ LOG.info("Restarted follower doesn't contain ephemeral " + o);
+ }
+ }
+ HashSet leadEphemerals = lead.getEphemerals(l);
+ for(Object o : leadEphemerals) {
+ if(!cleanEphemerals.contains(o)) {
+ LOG.info("Follower doesn't contain ephemeral from leader " + o);
+ }
+ }
+ assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
+ assertEquals("Leader should equal follower", lead.getEphemerals(l).size(), cleanEphemerals.size());
+ }
+ }
+}
Modified: zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1062327&r1=1062326&r2=1062327&view=diff
==============================================================================
--- zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/branches/branch-3.3/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Sun Jan 23 05:29:40 2011
@@ -61,9 +61,9 @@ public class QuorumUtil {
private final Map<Integer, PeerStruct> peers = new HashMap<Integer, PeerStruct>();
- private final int N;
+ public final int N;
- private final int ALL;
+ public final int ALL;
private String hostPort;
@@ -123,6 +123,7 @@ public class QuorumUtil {
}
public void startAll() throws IOException {
+ shutdownAll();
for (int i = 1; i <= ALL; ++i) {
start(i);
LOG.info("Started QuorumPeer " + i);
@@ -182,7 +183,26 @@ public class QuorumUtil {
ps.id, tickTime, initLimit, syncLimit);
Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+ ps.peer.start();
+ }
+
+ public void restart(int id) throws IOException {
+ start(id);
+ Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+ + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+ }
+
+ public void startThenShutdown(int id) throws IOException {
+ PeerStruct ps = getPeer(id);
+ LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
+ ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
+ ps.id, tickTime, initLimit, syncLimit);
+ Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
+
ps.peer.start();
+ Assert.assertTrue("Waiting for server up", ClientBase.waitForServerUp("127.0.0.1:"
+ + getPeer(id).clientPort, ClientBase.CONNECTION_TIMEOUT));
+ shutdown(id);
}
public void shutdownAll() {