You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/06/28 23:22:36 UTC
svn commit: r1497929 [1/2] - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/persistence/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/ s...
Author: thawan
Date: Fri Jun 28 21:22:35 2013
New Revision: 1497929
URL: http://svn.apache.org/r1497929
Log:
ZOOKEEPER-1413. Use on-disk transaction log for learner sync up (thawan)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/GetProposalFromTxnTest.java (with props)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/ivy.xml
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LoadFromLogTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jun 28 21:22:35 2013
@@ -536,7 +536,8 @@ IMPROVEMENTS:
ZOOKEEPER-1719. zkCli.sh, zkServer.sh and zkEnv.sh regression caused by ZOOKEEPER-1663
(Marshall McMullen via camille)
-
+
+ ZOOKEEPER-1413. Use on-disk transaction log for learner sync up (thawan)
Release 3.4.0 -
Modified: zookeeper/trunk/ivy.xml
URL: http://svn.apache.org/viewvc/zookeeper/trunk/ivy.xml?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/ivy.xml (original)
+++ zookeeper/trunk/ivy.xml Fri Jun 28 21:22:35 2013
@@ -39,8 +39,8 @@
</publications>
<dependencies>
- <dependency org="org.slf4j" name="slf4j-api" rev="1.6.2"/>
- <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.6.2" transitive="false"/>
+ <dependency org="org.slf4j" name="slf4j-api" rev="1.7.5"/>
+ <dependency org="org.slf4j" name="slf4j-log4j12" rev="1.7.5" transitive="false"/>
<dependency org="commons-cli" name="commons-cli" rev="1.2" />
<!-- transitive false turns off dependency checking, log4j deps seem borked -->
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java?rev=1497929&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java Fri Jun 28 21:22:35 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class provides an iterator interface to access Proposal deserialized
+ * from on-disk txnlog. The iterator deserializes one proposal at a time
+ * to reduce memory footprint. Note that the request part of the proposal
+ * is not initialized and set to null since we don't need it during
+ * follower sync-up.
+ *
+ */
+public class TxnLogProposalIterator implements Iterator<Proposal> {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TxnLogProposalIterator.class);
+
+ public static final TxnLogProposalIterator EMPTY_ITERATOR = new TxnLogProposalIterator();
+
+ private boolean hasNext = false;
+
+ private TxnIterator itr;
+
+ @Override
+ public boolean hasNext() {
+ return hasNext;
+ }
+
+ /**
+ * Proposal returned by this iterator has request part set to null, since
+ * it is not used for follower sync-up.
+ */
+ @Override
+ public Proposal next() {
+
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ Proposal p = new Proposal();
+ try {
+ TxnHeader hdr = itr.getHeader();
+ Record txn = itr.getTxn();
+ hdr.serialize(boa, "hdr");
+ if (txn != null) {
+ txn.serialize(boa, "txn");
+ }
+ baos.close();
+
+ QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, itr.getHeader()
+ .getZxid(), baos.toByteArray(), null);
+ p.packet = pp;
+ p.request = null;
+
+ // This is the only place that can throw IO exception
+ hasNext = itr.next();
+
+ } catch (IOException e) {
+ LOG.error("Unable to read txnlog from disk", e);
+ hasNext = false;
+ }
+
+ return p;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ private TxnLogProposalIterator() {
+ }
+
+ public TxnLogProposalIterator(TxnIterator itr) {
+ if (itr != null) {
+ this.itr = itr;
+ hasNext = (itr.getHeader() != null);
+ }
+ }
+
+}
Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TxnLogProposalIterator.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Fri Jun 28 21:22:35 2013
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Collection;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -34,8 +35,6 @@ import org.apache.jute.BinaryOutputArchi
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
@@ -45,12 +44,15 @@ import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPacket;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This class maintains the in memory database of zookeeper
@@ -70,6 +72,13 @@ public class ZKDatabase {
protected ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
protected FileTxnSnapLog snapLog;
protected long minCommittedLog, maxCommittedLog;
+
+ /**
+ * Default value is to use snapshot if txnlog size exceeds 1/3 the size of snapshot
+ */
+ public static final String SNAPSHOT_SIZE_FACTOR = "zookeeper.snapshotSizeFactor";
+ private double snapshotSizeFactor = 0.33;
+
public static final int commitLogCount = 500;
protected static int commitLogBuffer = 700;
protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();
@@ -256,9 +265,66 @@ public class ZKDatabase {
wl.unlock();
}
}
+
+ public double getSnapshotSizeFactor() {
+ return snapshotSizeFactor;
+ }
+ public long calculateTxnLogSizeLimit() {
+ long snapSize = 0;
+ try {
+ snapSize = snapLog.findMostRecentSnapshot().length();
+ } catch (IOException e) {
+ LOG.error("Unable to get size of most recent snapshot");
+ }
+ return (long) (snapSize * snapshotSizeFactor);
+ }
/**
+ * Get proposals from txnlog. Only packet part of proposal is populated.
+ *
+ * @param startZxid the starting zxid of the proposal
+ * @param sizeLimit maximum on-disk size of txnlog to fetch
+ * 0 is unlimited, negative value means disable.
+ * @return list of proposal (request part of each proposal is null)
+ */
+ public Iterator<Proposal> getProposalsFromTxnLog(long startZxid,
+ long sizeLimit) {
+ if (sizeLimit < 0) {
+ LOG.debug("Negative size limit - retrieving proposal via txnlog is disabled");
+ return TxnLogProposalIterator.EMPTY_ITERATOR;
+ }
+
+ TxnIterator itr = null;
+ try {
+
+ itr = snapLog.readTxnLog(startZxid, false);
+
+ // If we cannot guarantee that this is strictly the starting txn
+ // after a given zxid, we should fail.
+ if ((itr.getHeader() != null)
+ && (itr.getHeader().getZxid() > startZxid)) {
+ LOG.warn("Unable to find proposals from txnlog for zxid: "
+ + startZxid);
+ return TxnLogProposalIterator.EMPTY_ITERATOR;
+ }
+
+ if (sizeLimit > 0) {
+ long txnSize = itr.getStorageSize();
+ if (txnSize > sizeLimit) {
+ LOG.info("Txnlog size: " + txnSize + " exceeds sizeLimit: "
+ + sizeLimit);
+ return TxnLogProposalIterator.EMPTY_ITERATOR;
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to read txnlog from disk", e);
+ return TxnLogProposalIterator.EMPTY_ITERATOR;
+ }
+ return new TxnLogProposalIterator(itr);
+ }
+
+ /**
* remove a cnxn from the datatree
* @param cnxn the cnxn to remove from the datatree
*/
@@ -503,4 +569,11 @@ public class ZKDatabase {
}
}
+ /**
+ * Use for unit testing, so we can turn this feature on/off
+ * @param snapshotSizeFactor Set to minus value to turn this off.
+ */
+ public void setSnapshotSizeFactor(double snapshotSizeFactor) {
+ this.snapshotSizeFactor = snapshotSizeFactor;
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java Fri Jun 28 21:22:35 2013
@@ -40,6 +40,7 @@ import org.apache.jute.BinaryOutputArchi
import org.apache.jute.InputArchive;
import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
+import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
import org.slf4j.Logger;
@@ -338,7 +339,20 @@ public class FileTxnLog implements TxnLo
* logs
*/
public TxnIterator read(long zxid) throws IOException {
- return new FileTxnIterator(logDir, zxid);
+ return read(zxid, true);
+ }
+
+ /**
+ * start reading all the transactions from the given zxid.
+ *
+ * @param zxid the zxid to start reading transactions from
+ * @param fastForward true if the iterator should be fast forwarded to point
+ * to the txn of a given zxid, else the iterator will point to the
+ * starting txn of a txnlog that may contain txn of a given zxid
+ * @return returns an iterator to iterate through the transaction logs
+ */
+ public TxnIterator read(long zxid, boolean fastForward) throws IOException {
+ return new FileTxnIterator(logDir, zxid, fastForward);
}
/**
@@ -496,12 +510,34 @@ public class FileTxnLog implements TxnLo
* create an iterator over a transaction database directory
* @param logDir the transaction database directory
* @param zxid the zxid to start reading from
+ * @param fastForward true if the iterator should be fast forwarded to
+ * point to the txn of a given zxid, else the iterator will
+ * point to the starting txn of a txnlog that may contain txn of
+ * a given zxid
+ * @throws IOException
+ */
+ public FileTxnIterator(File logDir, long zxid, boolean fastForward)
+ throws IOException {
+ this.logDir = logDir;
+ this.zxid = zxid;
+ init();
+
+ if (fastForward && hdr != null) {
+ while (hdr.getZxid() < zxid) {
+ if (!next())
+ break;
+ }
+ }
+ }
+
+ /**
+ * create an iterator over a transaction database directory
+ * @param logDir the transaction database directory
+ * @param zxid the zxid to start reading from
* @throws IOException
*/
public FileTxnIterator(File logDir, long zxid) throws IOException {
- this.logDir = logDir;
- this.zxid = zxid;
- init();
+ this(logDir, zxid, true);
}
/**
@@ -525,10 +561,17 @@ public class FileTxnLog implements TxnLo
goToNextLog();
if (!next())
return;
- while (hdr.getZxid() < zxid) {
- if (!next())
- return;
+ }
+
+ /**
+ * Return total storage size of txnlog that will return by this iterator.
+ */
+ public long getStorageSize() {
+ long sum = 0;
+ for (File f : storedFiles) {
+ sum += f.length();
}
+ return sum;
}
/**
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/FileTxnSnapLog.java Fri Jun 28 21:22:35 2013
@@ -191,6 +191,33 @@ public class FileTxnSnapLog {
}
/**
+ * Get TxnIterator for iterating through txnlog starting at a given zxid
+ *
+ * @param zxid starting zxid
+ * @return TxnIterator
+ * @throws IOException
+ */
+ public TxnIterator readTxnLog(long zxid) throws IOException {
+ return readTxnLog(zxid, true);
+ }
+
+ /**
+ * Get TxnIterator for iterating through txnlog starting at a given zxid
+ *
+ * @param zxid starting zxid
+ * @param fastForward true if the iterator should be fast forwarded to point
+ * to the txn of a given zxid, else the iterator will point to the
+ * starting txn of a txnlog that may contain txn of a given zxid
+ * @return TxnIterator
+ * @throws IOException
+ */
+ public TxnIterator readTxnLog(long zxid, boolean fastForward)
+ throws IOException {
+ FileTxnLog txnLog = new FileTxnLog(dataDir);
+ return txnLog.read(zxid, fastForward);
+ }
+
+ /**
* process the transaction on the datatree
* @param hdr the hdr of the transaction
* @param dt the datatree to apply transaction to
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/persistence/TxnLog.java Fri Jun 28 21:22:35 2013
@@ -116,6 +116,13 @@ public interface TxnLog {
* @throws IOException
*/
void close() throws IOException;
+
+ /**
+ * Get an estimated storage space used to store transaction records
+ * that will return by this iterator
+ * @throws IOException
+ */
+ long getStorageSize() throws IOException;
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Fri Jun 28 21:22:35 2013
@@ -27,7 +27,8 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
-import java.util.List;
+import java.util.Iterator;
+import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
@@ -37,8 +38,8 @@ import org.apache.jute.BinaryOutputArchi
import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooTrace;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
@@ -96,6 +97,28 @@ public class LearnerHandler extends Thre
private BinaryOutputArchive oa;
private BufferedOutputStream bufferedOutput;
+
+ /**
+ * Keep track of whether we have started send packets thread
+ */
+ private volatile boolean sendingThreadStarted = false;
+
+ /**
+ * For testing purpose, force leader to use snapshot to sync with followers
+ */
+ public static final String FORCE_SNAP_SYNC = "zookeeper.forceSnapshotSync";
+ private boolean forceSnapSync = false;
+
+ /**
+ * Keep track of whether we need to queue TRUNC or DIFF into packet queue
+ * that we are going to blast it to the learner
+ */
+ private boolean needOpPacket = true;
+
+ /**
+ * Last zxid sent to the learner as part of synchronization
+ */
+ private long leaderLastZxid;
LearnerHandler(Socket sock, Leader leader) throws IOException {
super("LearnerHandler-" + sock.getRemoteSocketAddress());
@@ -315,155 +338,48 @@ public class LearnerHandler extends Thre
leader.waitForEpochAck(this.getSid(), ss);
}
peerLastZxid = ss.getLastZxid();
-
- /* the default to send to the follower */
- int packetToSend = Leader.SNAP;
- long zxidToSend = 0;
- long leaderLastZxid = 0;
- /** the packets that the follower needs to get updates from **/
- long updates = peerLastZxid;
-
- /* we are sending the diff check if we have proposals in memory to be able to
- * send a diff to the
- */
- ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
- ReadLock rl = lock.readLock();
- try {
- rl.lock();
- final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
- final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
- LOG.info("Synchronizing with Follower sid: " + sid
- +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
- +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
- +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));
-
- List<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();
-
- if (proposals.size() != 0) {
- LOG.debug("proposal size is {}", proposals.size());
- if ((maxCommittedLog >= peerLastZxid)
- && (minCommittedLog <= peerLastZxid)) {
- LOG.debug("Sending proposals to follower");
-
- // as we look through proposals, this variable keeps track of previous
- // proposal Id.
- long prevProposalZxid = minCommittedLog;
-
- // Keep track of whether we are about to send the first packet.
- // Before sending the first packet, we have to tell the learner
- // whether to expect a trunc or a diff
- boolean firstPacket=true;
-
- // If we are here, we can use committedLog to sync with
- // follower. Then we only need to decide whether to
- // send trunc or not
- packetToSend = Leader.DIFF;
- zxidToSend = maxCommittedLog;
-
- for (Proposal propose: proposals) {
- // skip the proposals the peer already has
- if (propose.packet.getZxid() <= peerLastZxid) {
- prevProposalZxid = propose.packet.getZxid();
- continue;
- } else {
- // If we are sending the first packet, figure out whether to trunc
- // in case the follower has some proposals that the leader doesn't
- if (firstPacket) {
- firstPacket = false;
- // Does the peer have some proposals that the leader hasn't seen yet
- if (prevProposalZxid < peerLastZxid) {
- // send a trunc message before sending the diff
- packetToSend = Leader.TRUNC;
- zxidToSend = prevProposalZxid;
- updates = zxidToSend;
- }
- }
- queuePacket(propose.packet);
- QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
- null, null);
- queuePacket(qcommit);
- }
- }
- } else if (peerLastZxid > maxCommittedLog) {
- LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
- Long.toHexString(maxCommittedLog),
- Long.toHexString(updates));
-
- packetToSend = Leader.TRUNC;
- zxidToSend = maxCommittedLog;
- updates = zxidToSend;
- } else {
- LOG.warn("Unhandled proposal scenario");
- }
- } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
- // The leader may recently take a snapshot, so the committedLog
- // is empty. We don't need to send snapshot if the follow
- // is already sync with in-memory db.
- LOG.debug("committedLog is empty but leader and follower "
- + "are in sync, zxid=0x{}",
- Long.toHexString(peerLastZxid));
- packetToSend = Leader.DIFF;
- zxidToSend = peerLastZxid;
- } else {
- // just let the state transfer happen
- LOG.debug("proposals is empty");
- }
- LOG.info("Sending " + Leader.getPacketType(packetToSend));
- leaderLastZxid = leader.startForwarding(this, updates);
-
- } finally {
- rl.unlock();
- }
-
+
+ // Take any necessary action if we need to send TRUNC or DIFF
+ // startForwarding() will be called in all cases
+ boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
+
LOG.debug("Sending NEWLEADER message to " + sid);
// the version of this quorumVerifier will be set by leader.lead() in case
// the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
// we got here, so the version was set
-
- if (getVersion() < 0x10000) {
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- newLeaderZxid, null, null);
+ if (getVersion() < 0x10000) {
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
+ .toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
- //Need to set the zxidToSend to the latest zxid
- if (packetToSend == Leader.SNAP) {
- zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
- }
- oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
- bufferedOutput.flush();
/* if we are not truncating or sending a diff just send a snapshot */
- if (packetToSend == Leader.SNAP) {
+ if (needSnap) {
+ long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
+ oa.writeRecord(new QuorumPacket(Leader.SNAP, zxidToSend, null, null), "packet");
+ bufferedOutput.flush();
+
LOG.info("Sending snapshot last zxid of peer is 0x"
+ Long.toHexString(peerLastZxid) + " "
- + " zxid of leader is 0x"
- + Long.toHexString(leaderLastZxid)
+ + "zxid of leader is 0x"
+ + Long.toHexString(leaderLastZxid) + " "
+ "sent zxid of db as 0x"
+ Long.toHexString(zxidToSend));
// Dump data to peer
leader.zk.getZKDatabase().serializeSnapshot(oa);
oa.writeString("BenWasHere", "signature");
+ bufferedOutput.flush();
}
- bufferedOutput.flush();
-
- // Start sending packets
- new Thread() {
- public void run() {
- Thread.currentThread().setName(
- "Sender-" + sock.getRemoteSocketAddress());
- try {
- sendPackets();
- } catch (InterruptedException e) {
- LOG.warn("Unexpected interruption",e);
- }
- }
- }.start();
+ // Start thread that blast packets in the queue to learner
+ startSendingPackets();
+
/*
* Have to wait for the first ACK, wait until
* the leader is ready, and only then we can
@@ -607,6 +523,273 @@ public class LearnerHandler extends Thre
}
}
+ /**
+ * Start thread that will forward any packet in the queue to the follower
+ */
+ protected void startSendingPackets() {
+ if (!sendingThreadStarted) {
+ // Start sending packets
+ new Thread() {
+ public void run() {
+ Thread.currentThread().setName(
+ "Sender-" + sock.getRemoteSocketAddress());
+ try {
+ sendPackets();
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected interruption " + e.getMessage());
+ }
+ }
+ }.start();
+ sendingThreadStarted = true;
+ } else {
+ LOG.error("Attempting to start sending thread after it already started");
+ }
+ }
+
+ /**
+ * Determine if we need to sync with follower using DIFF/TRUNC/SNAP
+ * and setup follower to receive packets from commit processor
+ *
+ * @param peerLastZxid
+ * @param db
+ * @param leader
+ * @return true if snapshot transfer is needed.
+ */
+ public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
+ /*
+ * When leader election is completed, the leader will set its
+ * lastProcessedZxid to be (epoch < 32). There will be no txn associated
+ * with this zxid.
+ *
+ * The learner will set its lastProcessedZxid to the same value if
+ * it get DIFF or SNAP from the leader. If the same learner come
+ * back to sync with leader using this zxid, we will never find this
+ * zxid in our history. In this case, we will ignore TRUNC logic and
+ * always send DIFF if we have old enough history
+ */
+ boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
+ // Keep track of the latest zxid which already queued
+ long currentZxid = peerLastZxid;
+ boolean needSnap = true;
+ boolean txnLogSyncEnabled = (db.getSnapshotSizeFactor() >= 0);
+ ReentrantReadWriteLock lock = db.getLogLock();
+ ReadLock rl = lock.readLock();
+ try {
+ rl.lock();
+ long maxCommittedLog = db.getmaxCommittedLog();
+ long minCommittedLog = db.getminCommittedLog();
+ long lastProcessedZxid = db.getDataTreeLastProcessedZxid();
+
+ LOG.info("Synchronizing with Follower sid: {} maxCommittedLog=0x{}"
+ + " minCommittedLog=0x{} lastProcessedZxid=0x{}"
+ + " peerLastZxid=0x{}", getSid(),
+ Long.toHexString(maxCommittedLog),
+ Long.toHexString(minCommittedLog),
+ Long.toHexString(lastProcessedZxid),
+ Long.toHexString(peerLastZxid));
+
+ if (db.getCommittedLog().isEmpty()) {
+ /*
+ * It is possible that commitedLog is empty. In that case
+ * setting these value to the latest txn in leader db
+ * will reduce the case that we need to handle
+ *
+ * Here is how each case handle by the if block below
+ * 1. lastProcessZxid == peerZxid -> Handle by (2)
+ * 2. lastProcessZxid < peerZxid -> Handle by (3)
+ * 3. lastProcessZxid > peerZxid -> Handle by (5)
+ */
+ minCommittedLog = lastProcessedZxid;
+ maxCommittedLog = lastProcessedZxid;
+ }
+
+ /*
+ * Here are the cases that we want to handle
+ *
+ * 1. Force sending snapshot (for testing purpose)
+ * 2. Peer and leader is already sync, send empty diff
+ * 3. Follower has txn that we haven't seen. This may be old leader
+ * so we need to send TRUNC. However, if peer has newEpochZxid,
+ * we cannot send TRUC since the follower has no txnlog
+ * 4. Follower is within committedLog range or already in-sync.
+ * We may need to send DIFF or TRUNC depending on follower's zxid
+ * We always send empty DIFF if follower is already in-sync
+ * 5. Follower missed the committedLog. We will try to use on-disk
+ * txnlog + committedLog to sync with follower. If that fail,
+ * we will send snapshot
+ */
+
+ if (forceSnapSync) {
+ // Force leader to use snapshot to sync with follower
+ LOG.warn("Forcing snapshot sync - should not see this in production");
+ } else if (lastProcessedZxid == peerLastZxid) {
+ // Follower is already sync with us, send empty diff
+ LOG.info("Sending DIFF zxid=0x" + Long.toHexString(peerLastZxid) +
+ " for peer sid: " + getSid());
+ queueOpPacket(Leader.DIFF, peerLastZxid);
+ needOpPacket = false;
+ needSnap = false;
+ } else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
+ // Newer than commitedLog, send trunc and done
+ LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
+ Long.toHexString(maxCommittedLog) +
+ " for peer sid:" + getSid());
+ queueOpPacket(Leader.TRUNC, maxCommittedLog);
+ currentZxid = maxCommittedLog;
+ needOpPacket = false;
+ needSnap = false;
+ } else if ((maxCommittedLog >= peerLastZxid)
+ && (minCommittedLog <= peerLastZxid)) {
+ // Follower is within commitLog range
+ LOG.info("Using committedLog for peer sid: " + getSid());
+ Iterator<Proposal> itr = db.getCommittedLog().iterator();
+ currentZxid = queueCommittedProposals(itr, peerLastZxid,
+ null, maxCommittedLog);
+ needSnap = false;
+ } else if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
+ // Use txnlog and committedLog to sync
+
+ // Calculate sizeLimit that we allow to retrieve txnlog from disk
+ long sizeLimit = db.calculateTxnLogSizeLimit();
+ // This method can return empty iterator if the requested zxid
+ // is older than on-disk txnlog
+ Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
+ peerLastZxid, sizeLimit);
+ if (txnLogItr.hasNext()) {
+ LOG.info("Use txnlog and committedLog for peer sid: " + getSid());
+ currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
+ minCommittedLog, maxCommittedLog);
+
+ LOG.debug("Queueing committedLog 0x" + Long.toHexString(currentZxid));
+ Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
+ currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
+ null, maxCommittedLog);
+ needSnap = false;
+ }
+ } else {
+ LOG.warn("Unhandled scenario for peer sid: " + getSid());
+ }
+ LOG.debug("Start forwarding 0x" + Long.toHexString(currentZxid) +
+ " for peer sid: " + getSid());
+ leaderLastZxid = leader.startForwarding(this, currentZxid);
+ } finally {
+ rl.unlock();
+ }
+
+ if (needOpPacket && !needSnap) {
+ // This should never happen, but we should fall back to sending
+ // snapshot just in case.
+ LOG.error("Unhandled scenario for peer sid: " + getSid() +
+ " fall back to use snapshot");
+ needSnap = true;
+ }
+
+ return needSnap;
+ }
+
+ /**
+ * Queue committed proposals into packet queue. The range of packets which
+ * is going to be queued are (peerLaxtZxid, maxZxid]
+ *
+ * @param itr iterator point to the proposals
+ * @param peerLastZxid last zxid seen by the follower
+ * @param maxZxid max zxid of the proposal to queue, null if no limit
+ * @param lastCommitedZxid when sending diff, we need to send lastCommitedZxid
+ * on the leader to follow Zab 1.0 protocol.
+ * @return last zxid of the queued proposal
+ */
+ protected long queueCommittedProposals(Iterator<Proposal> itr,
+ long peerLastZxid, Long maxZxid, Long lastCommitedZxid) {
+ boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
+ long queuedZxid = peerLastZxid;
+ // as we look through proposals, this variable keeps track of previous
+ // proposal Id.
+ long prevProposalZxid = -1;
+ while (itr.hasNext()) {
+ Proposal propose = itr.next();
+
+ long packetZxid = propose.packet.getZxid();
+ // abort if we hit the limit
+ if ((maxZxid != null) && (packetZxid > maxZxid)) {
+ break;
+ }
+
+ // skip the proposals the peer already has
+ if (packetZxid < peerLastZxid) {
+ prevProposalZxid = packetZxid;
+ continue;
+ }
+
+ // If we are sending the first packet, figure out whether to trunc
+ // or diff
+ if (needOpPacket) {
+
+ // Send diff when we see the follower's zxid in our history
+ if (packetZxid == peerLastZxid) {
+ LOG.info("Sending DIFF zxid=0x" +
+ Long.toHexString(lastCommitedZxid) +
+ " for peer sid: " + getSid());
+ queueOpPacket(Leader.DIFF, lastCommitedZxid);
+ needOpPacket = false;
+ continue;
+ }
+
+ if (isPeerNewEpochZxid) {
+ // Send diff and fall through if zxid is of a new-epoch
+ LOG.info("Sending DIFF zxid=0x" +
+ Long.toHexString(lastCommitedZxid) +
+ " for peer sid: " + getSid());
+ queueOpPacket(Leader.DIFF, lastCommitedZxid);
+ needOpPacket = false;
+ } else if (packetZxid > peerLastZxid ) {
+ // Peer have some proposals that the leader hasn't seen yet
+ // it may used to be a leader
+ if (ZxidUtils.getEpochFromZxid(packetZxid) !=
+ ZxidUtils.getEpochFromZxid(peerLastZxid)) {
+ // We cannot send TRUNC that cross epoch boundary.
+ // The learner will crash if it is asked to do so.
+ // We will send snapshot this those cases.
+ LOG.warn("Cannot send TRUNC to peer sid: " + getSid() +
+ " peer zxid is from different epoch" );
+ return queuedZxid;
+ }
+
+ LOG.info("Sending TRUNC zxid=0x" +
+ Long.toHexString(prevProposalZxid) +
+ " for peer sid: " + getSid());
+ queueOpPacket(Leader.TRUNC, prevProposalZxid);
+ needOpPacket = false;
+ }
+ }
+
+ if (packetZxid <= queuedZxid) {
+ // We can get here, if we don't have op packet to queue
+ // or there is a duplicate txn in a given iterator
+ continue;
+ }
+
+ // Since this is already a committed proposal, we need to follow
+ // it by a commit packet
+ queuePacket(propose.packet);
+ queueOpPacket(Leader.COMMIT, packetZxid);
+ queuedZxid = packetZxid;
+
+ }
+
+ if (needOpPacket && isPeerNewEpochZxid) {
+ // We will send DIFF for this kind of zxid in any case. This if-block
+ // is the catch when our history older than learner and there is
+ // no new txn since then. So we need an empty diff
+ LOG.info("Sending DIFF zxid=0x" +
+ Long.toHexString(lastCommitedZxid) +
+ " for peer sid: " + getSid());
+ queueOpPacket(Leader.DIFF, lastCommitedZxid);
+ needOpPacket = false;
+ }
+
+ return queuedZxid;
+ }
+
public void shutdown() {
// Send the packet of death
try {
@@ -633,6 +816,11 @@ public class LearnerHandler extends Thre
* ping calls from the leader to the peers
*/
public void ping() {
+ // If learner hasn't sync properly yet, don't send ping packet
+ // otherwise, the learner will crash
+ if (!sendingThreadStarted) {
+ return;
+ }
long id;
synchronized(leader) {
id = leader.lastProposed;
@@ -642,6 +830,16 @@ public class LearnerHandler extends Thre
queuePacket(ping);
}
+ /**
+ * Queue leader packet of a given type
+ * @param type
+ * @param zxid
+ */
+ private void queueOpPacket(int type, long zxid) {
+ QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
+ queuePacket(packet);
+ }
+
void queuePacket(QuorumPacket p) {
queuedPackets.add(p);
}
@@ -650,4 +848,19 @@ public class LearnerHandler extends Thre
return isAlive()
&& leader.self.tick <= tickOfNextAckDeadline;
}
+
+ /**
+ * For testing, return packet queue
+ * @return
+ */
+ public Queue<QuorumPacket> getQueuedPackets() {
+ return queuedPackets;
+ }
+
+ /**
+ * For testing, we need to reset this value
+ */
+ public void setFirstPacket(boolean value) {
+ needOpPacket = value;
+ }
}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java?rev=1497929&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java Fri Jun 28 21:22:35 2013
@@ -0,0 +1,563 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LearnerHandlerTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(LearnerHandlerTest.class);
+
+ class MockLearnerHandler extends LearnerHandler {
+ boolean threadStarted = false;
+
+ MockLearnerHandler(Socket sock, Leader leader) throws IOException {
+ super(sock, leader);
+ }
+
+ protected void startSendingPackets() {
+ threadStarted = true;
+ }
+ }
+
+ class MockZKDatabase extends ZKDatabase {
+ long lastProcessedZxid;
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ LinkedList<Proposal> committedLog = new LinkedList<Leader.Proposal>();
+ LinkedList<Proposal> txnLog = new LinkedList<Leader.Proposal>();
+
+ public MockZKDatabase(FileTxnSnapLog snapLog) {
+ super(snapLog);
+ }
+
+ public long getDataTreeLastProcessedZxid() {
+ return lastProcessedZxid;
+ }
+
+ public long getmaxCommittedLog() {
+ if (!committedLog.isEmpty()) {
+ return committedLog.getLast().packet.getZxid();
+ }
+ return 0;
+ }
+
+ public long getminCommittedLog() {
+ if (!committedLog.isEmpty()) {
+ return committedLog.getFirst().packet.getZxid();
+ }
+ return 0;
+ }
+
+ public LinkedList<Proposal> getCommittedLog() {
+ return committedLog;
+ }
+
+ public ReentrantReadWriteLock getLogLock() {
+ return lock;
+ }
+
+ public Iterator<Proposal> getProposalsFromTxnLog(long peerZxid,
+ long limit) {
+ if (peerZxid >= txnLog.peekFirst().packet.getZxid()) {
+ return txnLog.iterator();
+ } else {
+ return (new LinkedList<Proposal>()).iterator();
+ }
+
+ }
+
+ public long calculateTxnLogSizeLimit() {
+ return 1;
+ }
+ }
+
+ private MockLearnerHandler learnerHandler;
+ private Socket sock;
+
+ // Member variables for mocking Leader
+ private Leader leader;
+ private long currentZxid;
+
+ // Member variables for mocking ZkDatabase
+ private MockZKDatabase db;
+
+ @Before
+ public void setUp() throws Exception {
+ // Intercept when startForwarding is called
+ leader = mock(Leader.class);
+ when(
+ leader.startForwarding(Matchers.any(LearnerHandler.class),
+ Matchers.anyLong())).thenAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ currentZxid = (Long) invocation.getArguments()[1];
+ return 0;
+ }
+ });
+
+ sock = mock(Socket.class);
+
+ db = new MockZKDatabase(null);
+ learnerHandler = new MockLearnerHandler(sock, leader);
+ }
+
+ Proposal createProposal(long zxid) {
+ Proposal p = new Proposal();
+ p.packet = new QuorumPacket();
+ p.packet.setZxid(zxid);
+ p.packet.setType(Leader.PROPOSAL);
+ return p;
+ }
+
+ /**
+ * Validate that queued packets contains proposal in the following orders as
+ * a given array of zxids
+ *
+ * @param zxids
+ */
+ public void queuedPacketMatches(long[] zxids) {
+ int index = 0;
+ for (QuorumPacket qp : learnerHandler.getQueuedPackets()) {
+ if (qp.getType() == Leader.PROPOSAL) {
+ assertZxidEquals(zxids[index++], qp.getZxid());
+ }
+ }
+ }
+
+ void reset() {
+ learnerHandler.getQueuedPackets().clear();
+ learnerHandler.threadStarted = false;
+ learnerHandler.setFirstPacket(true);
+ }
+
+ /**
+ * Check if op packet (first packet in the queue) match the expected value
+ * @param type - type of packet
+ * @param zxid - zxid in the op packet
+ * @param currentZxid - last packet queued by syncFollower,
+ * before invoking startForwarding()
+ */
+ public void assertOpType(int type, long zxid, long currentZxid) {
+ Queue<QuorumPacket> packets = learnerHandler.getQueuedPackets();
+ assertTrue(packets.size() > 0);
+ assertEquals(type, packets.peek().getType());
+ assertZxidEquals(zxid, packets.peek().getZxid());
+ assertZxidEquals(currentZxid, this.currentZxid);
+ }
+
+ void assertZxidEquals(long expected, long value) {
+ assertEquals("Expected 0x" + Long.toHexString(expected) + " but was 0x"
+ + Long.toHexString(value), expected, value);
+ }
+
+ /**
+ * Test cases when leader has empty commitedLog
+ */
+ @Test
+ public void testEmptyCommittedLog() throws Exception {
+ long peerZxid;
+
+ // Peer has newer zxid
+ peerZxid = 3;
+ db.lastProcessedZxid = 1;
+ db.committedLog.clear();
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send TRUNC and forward any packet starting lastProcessedZxid
+ assertOpType(Leader.TRUNC, db.lastProcessedZxid, db.lastProcessedZxid);
+ reset();
+
+ // Peer is already sync
+ peerZxid = 1;
+ db.lastProcessedZxid = 1;
+ db.committedLog.clear();
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF and forward any packet starting lastProcessedZxid
+ assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
+ assertEquals(1, learnerHandler.getQueuedPackets().size());
+ reset();
+
+ // Peer has 0 zxid (new machine turn up), txnlog
+ // is disabled
+ peerZxid = 0;
+ db.setSnapshotSizeFactor(-1);
+ db.lastProcessedZxid = 1;
+ db.committedLog.clear();
+ // We send SNAP
+ assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+ assertEquals(0, learnerHandler.getQueuedPackets().size());
+ reset();
+
+ }
+
+ /**
+ * Test cases when leader has committedLog
+ */
+ @Test
+ public void testCommittedLog() throws Exception {
+ long peerZxid;
+
+ // Commit proposal may lag behind data tree, but it shouldn't affect
+ // us in any case
+ db.lastProcessedZxid = 6;
+ db.committedLog.add(createProposal(2));
+ db.committedLog.add(createProposal(3));
+ db.committedLog.add(createProposal(5));
+
+ // Peer has zxid that we have never seen
+ peerZxid = 4;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send TRUNC to 3 and forward any packet starting 5
+ assertOpType(Leader.TRUNC, 3, 5);
+ // DIFF + 1 proposals + 1 commit
+ assertEquals(3, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 5 });
+ reset();
+
+ // Peer is within committedLog range
+ peerZxid = 2;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF and forward any packet starting lastProcessedZxid
+ assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+ db.getmaxCommittedLog());
+ // DIFF + 2 proposals + 2 commit
+ assertEquals(5, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 3, 5 });
+ reset();
+
+ // Peer miss the committedLog and txnlog is disabled
+ peerZxid = 1;
+ db.setSnapshotSizeFactor(-1);
+ // We send SNAP
+ assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+ assertEquals(0, learnerHandler.getQueuedPackets().size());
+ reset();
+ }
+
+ /**
+ * Test cases when txnlog is enabled
+ */
+ @Test
+ public void testTxnLog() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(2));
+ db.txnLog.add(createProposal(3));
+ db.txnLog.add(createProposal(5));
+ db.txnLog.add(createProposal(6));
+ db.txnLog.add(createProposal(7));
+ db.txnLog.add(createProposal(8));
+ db.txnLog.add(createProposal(9));
+
+ db.lastProcessedZxid = 9;
+ db.committedLog.add(createProposal(6));
+ db.committedLog.add(createProposal(7));
+ db.committedLog.add(createProposal(8));
+
+ // Peer has zxid that we have never seen
+ peerZxid = 4;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
+ assertOpType(Leader.TRUNC, 3, db.getmaxCommittedLog());
+ // DIFF + 4 proposals + 4 commit
+ assertEquals(9, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 5, 6, 7, 8 });
+ reset();
+
+ // Peer zxid is in txnlog range
+ peerZxid = 3;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF and forward any packet starting at maxCommittedLog
+ assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+ db.getmaxCommittedLog());
+ // DIFF + 4 proposals + 4 commit
+ assertEquals(9, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 5, 6, 7, 8 });
+ reset();
+
+ }
+
+ /**
+ * Test cases when txnlog is enabled and commitedLog is empty
+ */
+ @Test
+ public void testTxnLogOnly() throws Exception {
+ long peerZxid;
+
+ // CommmitedLog is empty, we will use txnlog up to lastProcessZxid
+ db.lastProcessedZxid = 7;
+ db.txnLog.add(createProposal(2));
+ db.txnLog.add(createProposal(3));
+ db.txnLog.add(createProposal(5));
+ db.txnLog.add(createProposal(6));
+ db.txnLog.add(createProposal(7));
+ db.txnLog.add(createProposal(8));
+
+ // Peer has zxid that we have never seen
+ peerZxid = 4;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send TRUNC to 3 and forward any packet starting at
+ // lastProcessedZxid
+ assertOpType(Leader.TRUNC, 3, db.lastProcessedZxid);
+ // DIFF + 3 proposals + 3 commit
+ assertEquals(7, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 5, 6, 7 });
+ reset();
+
+ // Peer has zxid in txnlog range
+ peerZxid = 2;
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF and forward any packet starting at lastProcessedZxid
+ assertOpType(Leader.DIFF, db.lastProcessedZxid, db.lastProcessedZxid);
+ // DIFF + 4 proposals + 4 commit
+ assertEquals(9, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { 3, 5, 6, 7 });
+ reset();
+
+ // Peer miss the txnlog
+ peerZxid = 1;
+ assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send snap
+ assertEquals(0, learnerHandler.getQueuedPackets().size());
+ reset();
+ }
+
+ long getZxid(long epoch, long counter){
+ return ZxidUtils.makeZxid(epoch, counter);
+ }
+
+ /**
+ * Test cases with zxids that are negative long
+ */
+ @Test
+ public void testTxnLogWithNegativeZxid() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(getZxid(0xf, 2)));
+ db.txnLog.add(createProposal(getZxid(0xf, 3)));
+ db.txnLog.add(createProposal(getZxid(0xf, 5)));
+ db.txnLog.add(createProposal(getZxid(0xf, 6)));
+ db.txnLog.add(createProposal(getZxid(0xf, 7)));
+ db.txnLog.add(createProposal(getZxid(0xf, 8)));
+ db.txnLog.add(createProposal(getZxid(0xf, 9)));
+
+ db.lastProcessedZxid = getZxid(0xf, 9);
+ db.committedLog.add(createProposal(getZxid(0xf, 6)));
+ db.committedLog.add(createProposal(getZxid(0xf, 7)));
+ db.committedLog.add(createProposal(getZxid(0xf, 8)));
+
+ // Peer has zxid that we have never seen
+ peerZxid = getZxid(0xf, 4);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send TRUNC to 3 and forward any packet starting at maxCommittedLog
+ assertOpType(Leader.TRUNC, getZxid(0xf, 3), db.getmaxCommittedLog());
+ // DIFF + 4 proposals + 4 commit
+ assertEquals(9, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(0xf, 5),
+ getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
+ reset();
+
+ // Peer zxid is in txnlog range
+ peerZxid = getZxid(0xf, 3);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF and forward any packet starting at maxCommittedLog
+ assertOpType(Leader.DIFF, db.getmaxCommittedLog(),
+ db.getmaxCommittedLog());
+ // DIFF + 4 proposals + 4 commit
+ assertEquals(9, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(0xf, 5),
+ getZxid(0xf, 6), getZxid(0xf, 7), getZxid(0xf, 8) });
+ reset();
+ }
+
+ /**
+ * Test cases when peer has new-epoch zxid
+ */
+ @Test
+ public void testNewEpochZxid() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(getZxid(0, 1)));
+ db.txnLog.add(createProposal(getZxid(1, 1)));
+ db.txnLog.add(createProposal(getZxid(1, 2)));
+
+ // After leader election, lastProcessedZxid will point to new epoch
+ db.lastProcessedZxid = getZxid(2, 0);
+ db.committedLog.add(createProposal(getZxid(1, 1)));
+ db.committedLog.add(createProposal(getZxid(1, 2)));
+
+ // Peer has zxid of epoch 0
+ peerZxid = getZxid(0, 0);
+ // We should get snap, we can do better here, but the main logic is
+ // that we should never send diff if we have never seen any txn older
+ // than peer zxid
+ assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+ assertEquals(0, learnerHandler.getQueuedPackets().size());
+ reset();
+
+ // Peer has zxid of epoch 1
+ peerZxid = getZxid(1, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
+ assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
+ // DIFF + 2 proposals + 2 commit
+ assertEquals(5, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
+ reset();
+
+ // Peer has zxid of epoch 2, so it is already sync
+ peerZxid = getZxid(2, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (2, 0) and forward any packet starting at (2, 0)
+ assertOpType(Leader.DIFF, getZxid(2, 0), getZxid(2, 0));
+ // DIFF only
+ assertEquals(1, learnerHandler.getQueuedPackets().size());
+ reset();
+
+ }
+
+ /**
+ * Test cases when learner has new-epcoh zxid
+ * (zxid & 0xffffffffL) == 0;
+ */
+ @Test
+ public void testNewEpochZxidWithTxnlogOnly() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(getZxid(1, 1)));
+ db.txnLog.add(createProposal(getZxid(2, 1)));
+ db.txnLog.add(createProposal(getZxid(2, 2)));
+ db.txnLog.add(createProposal(getZxid(4, 1)));
+
+ // After leader election, lastProcessedZxid will point to new epoch
+ db.lastProcessedZxid = getZxid(6, 0);
+
+ // Peer has zxid of epoch 3
+ peerZxid = getZxid(3, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (6,0) and forward any packet starting at (4,1)
+ assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
+ // DIFF + 1 proposals + 1 commit
+ assertEquals(3, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(4, 1)});
+ reset();
+
+ // Peer has zxid of epoch 4
+ peerZxid = getZxid(4, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (6,0) and forward any packet starting at (4,1)
+ assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(4, 1));
+ // DIFF + 1 proposals + 1 commit
+ assertEquals(3, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(4, 1)});
+ reset();
+
+ // Peer has zxid of epoch 5
+ peerZxid = getZxid(5, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (6,0) and forward any packet starting at (5,0)
+ assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(5, 0));
+ // DIFF only
+ assertEquals(1, learnerHandler.getQueuedPackets().size());
+ reset();
+
+ // Peer has zxid of epoch 6
+ peerZxid = getZxid(6, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (6,0) and forward any packet starting at (6, 0)
+ assertOpType(Leader.DIFF, getZxid(6, 0), getZxid(6, 0));
+ // DIFF only
+ assertEquals(1, learnerHandler.getQueuedPackets().size());
+ reset();
+ }
+
+ /**
+ * Test cases when there is a duplicate txn in the committedLog. This
+ * should never happen unless there is a bug in initialization code
+ * but the learner should never see duplicate packets
+ */
+ @Test
+ public void testDuplicatedTxn() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(getZxid(0, 1)));
+ db.txnLog.add(createProposal(getZxid(1, 1)));
+ db.txnLog.add(createProposal(getZxid(1, 2)));
+ db.txnLog.add(createProposal(getZxid(1, 1)));
+ db.txnLog.add(createProposal(getZxid(1, 2)));
+
+ // After leader election, lastProcessedZxid will point to new epoch
+ db.lastProcessedZxid = getZxid(2, 0);
+ db.committedLog.add(createProposal(getZxid(1, 1)));
+ db.committedLog.add(createProposal(getZxid(1, 2)));
+ db.committedLog.add(createProposal(getZxid(1, 1)));
+ db.committedLog.add(createProposal(getZxid(1, 2)));
+
+ // Peer has zxid of epoch 1
+ peerZxid = getZxid(1, 0);
+ assertFalse(learnerHandler.syncFollower(peerZxid, db, leader));
+ // We send DIFF to (1, 2) and forward any packet starting at (1, 2)
+ assertOpType(Leader.DIFF, getZxid(1, 2), getZxid(1, 2));
+ // DIFF + 2 proposals + 2 commit
+ assertEquals(5, learnerHandler.getQueuedPackets().size());
+ queuedPacketMatches(new long[] { getZxid(1, 1), getZxid(1, 2)});
+ reset();
+
+ }
+
+ /**
+ * Test cases when we have to TRUNC learner, but it may cross epoch boundary
+ * so we need to send snap instead
+ */
+ @Test
+ public void testCrossEpochTrunc() throws Exception {
+ long peerZxid;
+ db.txnLog.add(createProposal(getZxid(1, 1)));
+ db.txnLog.add(createProposal(getZxid(2, 1)));
+ db.txnLog.add(createProposal(getZxid(2, 2)));
+ db.txnLog.add(createProposal(getZxid(4, 1)));
+
+ // After leader election, lastProcessedZxid will point to new epoch
+ db.lastProcessedZxid = getZxid(6, 0);
+
+ // Peer has zxid (3, 1)
+ peerZxid = getZxid(3, 1);
+ assertTrue(learnerHandler.syncFollower(peerZxid, db, leader));
+ assertEquals(0, learnerHandler.getQueuedPackets().size());
+ reset();
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/LearnerHandlerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java?rev=1497929&r1=1497928&r2=1497929&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FollowerResyncConcurrencyTest.java Fri Jun 28 21:22:35 2013
@@ -33,8 +33,8 @@ import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.log4j.Logger;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -47,15 +47,37 @@ import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class FollowerResyncConcurrencyTest extends ZKTestCase {
- private static final Logger LOG = Logger.getLogger(FollowerResyncConcurrencyTest.class);
+ private static final Logger LOG = LoggerFactory.getLogger(FollowerResyncConcurrencyTest.class);
public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
- private volatile int counter = 0;
+ private AtomicInteger counter = new AtomicInteger(0);
+ private AtomicInteger errors = new AtomicInteger(0);
+ /**
+ * Keep track of pending async operations, we shouldn't start verifying
+ * the state until pending operation is 0
+ */
+ private AtomicInteger pending = new AtomicInteger(0);
+
+ @Before
+ public void setUp() throws Exception {
+ pending.set(0);
+ errors.set(0);
+ counter.set(0);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("Error count {}" , errors.get());
+ }
/**
* See ZOOKEEPER-1319 - verify that a lagging follwer resyncs correctly
@@ -149,7 +171,28 @@ public class FollowerResyncConcurrencyTe
*/
@Test
public void testResyncBySnapThenDiffAfterFollowerCrashes()
- throws IOException, InterruptedException, KeeperException, Throwable
+ throws IOException, InterruptedException, KeeperException, Throwable
+ {
+ followerResyncCrashTest(false);
+ }
+
+ /**
+ * Same as testResyncBySnapThenDiffAfterFollowerCrashes() but we resync
+ * follower using txnlog
+ *
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws KeeperException
+ */
+ @Test
+ public void testResyncByTxnlogThenDiffAfterFollowerCrashes()
+ throws IOException, InterruptedException, KeeperException, Throwable
+ {
+ followerResyncCrashTest(true);
+ }
+
+ public void followerResyncCrashTest(boolean useTxnLogResync)
+ throws IOException, InterruptedException, KeeperException, Throwable
{
final Semaphore sem = new Semaphore(0);
@@ -166,6 +209,18 @@ public class FollowerResyncConcurrencyTe
Leader leader = qu.getPeer(index).peer.leader;
assertNotNull(leader);
+
+ if (useTxnLogResync) {
+ // Set the factor to high value so that this test case always
+ // resync using txnlog
+ qu.getPeer(index).peer.getActiveServer().getZKDatabase()
+ .setSnapshotSizeFactor(1000);
+ } else {
+ // Disable sending DIFF using txnlog, so that this test still
+ // testing the ZOOKEEPER-962 bug
+ qu.getPeer(index).peer.getActiveServer().getZKDatabase()
+ .setSnapshotSizeFactor(-1);
+ }
/* Reusing the index variable to select a follower to connect to */
index = (index == 1) ? 2 : 1;
@@ -190,20 +245,28 @@ public class FollowerResyncConcurrencyTe
LOG.info("zk2 has session id 0x" + Long.toHexString(zk2.getSessionId()));
zk1.create("/first", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // Prepare a thread that will create znodes.
Thread mytestfooThread = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0; i < 3000; i++) {
+ // Here we create 3000 znodes
zk3.create("/mytestfoo", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter == 16200){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();
+ }
+ if(counter.get() == 16200){
sem.release();
}
}
}, null);
+ pending.incrementAndGet();
if(i%10==0){
try {
Thread.sleep(100);
@@ -216,29 +279,43 @@ public class FollowerResyncConcurrencyTe
}
});
+ // Here we start populating the server and shutdown the follower after
+ // initial data is written.
for(int i = 0; i < 13000; i++) {
+ // Here we create 13000 znodes
zk3.create("/mybar", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter == 16200){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();
+ }
+ if(counter.get() == 16200){
sem.release();
}
}
}, null);
+ pending.incrementAndGet();
if(i == 5000){
qu.shutdown(index);
LOG.info("Shutting down s1");
}
if(i == 12000){
- //Restart off of snap, then get some txns for a log, then shut down
+ // Start the prepared thread so that it is writing znodes while
+ // the follower is restarting. On the first restart, the follow
+ // should use txnlog to catchup. For subsequent restart, the
+ // follower should use a diff to catchup.
mytestfooThread.start();
+ LOG.info("Restarting follower " + index);
qu.restart(index);
- Thread.sleep(300);
- qu.shutdown(index);
Thread.sleep(300);
+ LOG.info("Shutdown follower " + index);
+ qu.shutdown(index);
+ Thread.sleep(300);
+ LOG.info("Restarting follower " + index);
qu.restart(index);
LOG.info("Setting up server: " + index);
}
@@ -250,12 +327,17 @@ public class FollowerResyncConcurrencyTe
zk2.create("/newbaz", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, new AsyncCallback.StringCallback() {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter == 16200){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();
+ }
+ if(counter.get() == 16200){
sem.release();
}
}
}, null);
+ pending.incrementAndGet();
}
}
@@ -267,7 +349,8 @@ public class FollowerResyncConcurrencyTe
if (mytestfooThread.isAlive()) {
LOG.error("mytestfooThread is still alive");
}
- Thread.sleep(1000);
+ assertTrue(waitForPendingRequests(60));
+ assertTrue(waitForSync(qu, index, 10));
verifyState(qu, index, leader);
@@ -353,13 +436,17 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter > 7300){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();;
+ }
+ if(counter.get() > 7300){
sem.release();
}
}
}, null);
-
+ pending.incrementAndGet();
try {
Thread.sleep(10);
} catch (Exception e) {
@@ -379,13 +466,17 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter > 7300){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();;
+ }
+ if(counter.get() > 7300){
sem.release();
}
}
}, null);
-
+ pending.incrementAndGet();
if(i == 1000){
qu.shutdown(index);
Thread.sleep(1100);
@@ -407,12 +498,17 @@ public class FollowerResyncConcurrencyTe
@Override
public void processResult(int rc, String path, Object ctx, String name) {
- counter++;
- if(counter > 7300){
+ pending.decrementAndGet();
+ counter.incrementAndGet();
+ if (rc != 0) {
+ errors.incrementAndGet();
+ }
+ if(counter.get() > 7300){
sem.release();
}
}
}, null);
+ pending.incrementAndGet();
}
if(i == 1050 || i == 1100 || i == 1150) {
Thread.sleep(1000);
@@ -428,7 +524,8 @@ public class FollowerResyncConcurrencyTe
LOG.error("mytestfooThread is still alive");
}
- Thread.sleep(1000);
+ assertTrue(waitForPendingRequests(60));
+ assertTrue(waitForSync(qu, index, 10));
// Verify that server is following and has the same epoch as the leader
verifyState(qu, index, leader);
@@ -450,6 +547,49 @@ public class FollowerResyncConcurrencyTe
watcher.waitForConnected(CONNECTION_TIMEOUT);
return zk;
}
+
+ /**
+ * Wait for all async operation to return. So we know that we can start
+ * verifying the state
+ */
+ private boolean waitForPendingRequests(int timeout) throws InterruptedException {
+ LOG.info("Wait for pending requests: " + pending.get());
+ for (int i = 0; i < timeout; ++i) {
+ Thread.sleep(1000);
+ if (pending.get() == 0) {
+ return true;
+ }
+ }
+ LOG.info("Timeout waiting for pending requests: " + pending.get());
+ return false;
+ }
+
+ /**
+ * Wait for all server to have the same lastProccessedZxid. Timeout in seconds
+ */
+ private boolean waitForSync(QuorumUtil qu, int index, int timeout) throws InterruptedException{
+ LOG.info("Wait for server to sync");
+ int leaderIndex = (index == 1) ? 2 : 1;
+ ZKDatabase restartedDb = qu.getPeer(index).peer.getActiveServer().getZKDatabase();
+ ZKDatabase cleanDb = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
+ ZKDatabase leadDb = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
+ long leadZxid = 0;
+ long cleanZxid = 0;
+ long restartedZxid = 0;
+ for (int i = 0; i < timeout; ++i) {
+ leadZxid = leadDb.getDataTreeLastProcessedZxid();
+ cleanZxid = cleanDb.getDataTreeLastProcessedZxid();
+ restartedZxid = restartedDb.getDataTreeLastProcessedZxid();
+ if (leadZxid == cleanZxid && leadZxid == restartedZxid) {
+ return true;
+ }
+ Thread.sleep(1000);
+ }
+ LOG.info("Timeout waiting for zxid to sync: leader 0x" + Long.toHexString(leadZxid)+
+ "clean 0x" + Long.toHexString(cleanZxid) +
+ "restarted 0x" + Long.toHexString(restartedZxid));
+ return false;
+ }
private static TestableZooKeeper createTestableClient(String hp)
throws IOException, TimeoutException, InterruptedException
@@ -470,6 +610,7 @@ public class FollowerResyncConcurrencyTe
}
private void verifyState(QuorumUtil qu, int index, Leader leader) {
+ LOG.info("Verifying state");
assertTrue("Not following", qu.getPeer(index).peer.follower != null);
long epochF = (qu.getPeer(index).peer.getActiveServer().getZxid() >> 32L);
long epochL = (leader.getEpoch() >> 32L);
@@ -487,18 +628,33 @@ public class FollowerResyncConcurrencyTe
ZKDatabase clean = qu.getPeer(3).peer.getActiveServer().getZKDatabase();
ZKDatabase lead = qu.getPeer(leaderIndex).peer.getActiveServer().getZKDatabase();
for(Long l : sessionsRestarted) {
+ LOG.info("Validating ephemeral for session id 0x" + Long.toHexString(l));
assertTrue("Should have same set of sessions in both servers, did not expect: " + l, sessionsNotRestarted.contains(l));
Set<String> ephemerals = restarted.getEphemerals(l);
Set<String> cleanEphemerals = clean.getEphemerals(l);
- for(Object o : cleanEphemerals) {
+ for(String o : cleanEphemerals) {
if(!ephemerals.contains(o)) {
- LOG.info("Restarted follower doesn't contain ephemeral " + o);
+ LOG.info("Restarted follower doesn't contain ephemeral {} zxid 0x{}",
+ o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
+ }
+ }
+ for(String o : ephemerals) {
+ if(!cleanEphemerals.contains(o)) {
+ LOG.info("Restarted follower has extra ephemeral {} zxid 0x{}",
+ o, Long.toHexString(restarted.getDataTree().getNode(o).stat.getMzxid()));
}
}
Set<String> leadEphemerals = lead.getEphemerals(l);
- for(Object o : leadEphemerals) {
+ for(String o : leadEphemerals) {
if(!cleanEphemerals.contains(o)) {
- LOG.info("Follower doesn't contain ephemeral from leader " + o);
+ LOG.info("Follower doesn't contain ephemeral from leader {} zxid 0x{}",
+ o, Long.toHexString(lead.getDataTree().getNode(o).stat.getMzxid()));
+ }
+ }
+ for(String o : cleanEphemerals) {
+ if(!leadEphemerals.contains(o)) {
+ LOG.info("Leader doesn't contain ephemeral from follower {} zxid 0x{}",
+ o, Long.toHexString(clean.getDataTree().getNode(o).stat.getMzxid()));
}
}
assertEquals("Should have same number of ephemerals in both followers", ephemerals.size(), cleanEphemerals.size());
@@ -528,6 +684,7 @@ public class FollowerResyncConcurrencyTe
long lzxid = zk.testableLastZxid();
assertTrue("lzxid:" + lzxid + " > 0", lzxid > 0);
zk.close();
+ qu.shutdownAll();
}
private class MyWatcher extends CountdownWatcher {
@@ -584,6 +741,7 @@ public class FollowerResyncConcurrencyTe
zk1.close();
zk2.close();
+ qu.shutdownAll();
}
}