You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2019/07/12 15:01:51 UTC
[zookeeper] branch master updated: ZOOKEEPER-3398:
Learner.connectToLeader() may take too long to time-out
This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 43ce772 ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out
43ce772 is described below
commit 43ce772db000721546fcd13dd8523002dfa97741
Author: Vladimir Ivic <vl...@me.com>
AuthorDate: Fri Jul 12 17:01:44 2019 +0200
ZOOKEEPER-3398: Learner.connectToLeader() may take too long to time-out
After leader election happens, the followers will connect to the leader which is facilitated by the Learner.connectToLeader() method.
Learner.connectToLeader() is relying on the initLimit configuration value to time-out in case the network connection is unreliable. This config may have a high value that could leave the ensemble retrying and waiting in the state of not having quorum for too long. The follower will retry up to 5 times.
This patch introduces a new configuration directive that will allow Zookeeper to use different time-out value `connectToLeaderLimit` which then could be set to lower value than `initLimit`.
Test plan:
- ant clean
- ant test-core-java
NOTE: Lots of whitespace changes, hope it helps.
Author: Vladimir Ivic <vl...@me.com>
Reviewers: eolivelli@apache.org, hanm@apache.org, andor@apache.org
Closes #953 from vladimirivic/ZOOKEEPER-3398 and squashes the following commits:
da4ecd055 [Vladimir Ivic] Removed redundant test, chaning LearnerTest.connectToLearnerMasterLimitTest() params and assertions
6c413311c [Vladimir Ivic] Updating the tests with the new timeout parameter
5a89cbd7e [Vladimir Ivic] Rewriting timeout logic inside Leader.connectToLeader
99c065616 [Vladimir Ivic] Adding config connectToLearnerMasterLimit to prevent long connect timeout
---
.../src/main/resources/markdown/zookeeperAdmin.md | 6 ++++
.../apache/zookeeper/test/system/BaseSysTest.java | 3 +-
.../zookeeper/test/system/QuorumPeerInstance.java | 3 +-
.../apache/zookeeper/server/quorum/Learner.java | 38 +++++++++++++---------
.../apache/zookeeper/server/quorum/QuorumPeer.java | 36 +++++++++++++++-----
.../zookeeper/server/quorum/QuorumPeerConfig.java | 4 +++
.../zookeeper/server/quorum/QuorumPeerMain.java | 1 +
.../zookeeper/server/quorum/CnxManagerTest.java | 18 +++++-----
.../quorum/FLEBackwardElectionRoundTest.java | 6 ++--
.../server/quorum/FLELostMessageTest.java | 4 +--
.../server/quorum/FLEOutOfElectionTest.java | 2 +-
.../zookeeper/server/quorum/LearnerTest.java | 22 +++++++++++++
.../zookeeper/server/quorum/QuorumPeerTest.java | 5 +--
.../server/quorum/QuorumPeerTestBase.java | 2 ++
.../quorum/ReconfigDuringLeaderSyncTest.java | 7 ++--
.../org/apache/zookeeper/test/FLENewEpochTest.java | 4 +--
.../apache/zookeeper/test/FLEPredicateTest.java | 2 +-
.../org/apache/zookeeper/test/FLERestartTest.java | 4 +--
.../java/org/apache/zookeeper/test/FLETest.java | 10 +++---
.../apache/zookeeper/test/FLEZeroWeightTest.java | 2 +-
.../zookeeper/test/HierarchicalQuorumTest.java | 11 ++++---
.../java/org/apache/zookeeper/test/QuorumBase.java | 22 +++++++------
.../java/org/apache/zookeeper/test/QuorumUtil.java | 9 +++--
.../org/apache/zookeeper/test/TruncateTest.java | 7 ++--
24 files changed, 151 insertions(+), 77 deletions(-)
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index b0b07dc..bfff07b 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -913,6 +913,12 @@ of servers -- that is, when deploying clusters of servers.
connect and sync to a leader. Increased this value as needed, if
the amount of data managed by ZooKeeper is large.
+* *connectToLearnerMasterLimit* :
+ (Java system property: zookeeper.**connectToLearnerMasterLimit**)
+ Amount of time, in ticks (see [tickTime](#id_tickTime)), to allow followers to
+ connect to the leader after leader election. Defaults to the value of initLimit.
+ Use when initLimit is high so connecting to learner master doesn't result in higher timeout.
+
* *leaderServes* :
(Java system property: zookeeper.**leaderServes**)
Leader accepts client connections. Default value is "yes".
diff --git a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
index 73669b0..8856282 100644
--- a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
+++ b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/BaseSysTest.java
@@ -176,11 +176,12 @@ public class BaseSysTest {
final static int tickTime = 2000;
final static int initLimit = 3;
final static int syncLimit = 3;
+ final static int connectToLearnerMasterLimit = 3;
public void startServer(int index) throws IOException {
int port = fakeBasePort+10+index;
if (fakeMachines) {
- qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit);
+ qps[index] = new QuorumPeer(peers, qpsDirs[index], qpsDirs[index], port, 3, index+1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
qps[index].start();
} else {
try {
diff --git a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
index cd66b43..782deaf 100644
--- a/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
+++ b/zookeeper-it/src/test/java/org/apache/zookeeper/test/system/QuorumPeerInstance.java
@@ -43,6 +43,7 @@ class QuorumPeerInstance implements Instance {
private static final int syncLimit = 3;
private static final int initLimit = 3;
+ private static final int connectToLearnerMasterLimit = 3;
private static final int tickTime = 2000;
String serverHostPort;
int serverId;
@@ -191,7 +192,7 @@ class QuorumPeerInstance implements Instance {
return;
}
System.err.println("SnapDir = " + snapDir + " LogDir = " + logDir);
- peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit);
+ peer = new QuorumPeer(peers, snapDir, logDir, clientAddr.getPort(), 3, serverId, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
peer.start();
for(int i = 0; i < 5; i++) {
Thread.sleep(500);
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
index 51979aa..168c44b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
@@ -241,7 +241,7 @@ public class Learner {
throws IOException {
sock.connect(addr, timeout);
}
-
+
/**
* Establish a connection with the LearnerMaster found by findLearnerMaster.
* Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
@@ -256,41 +256,49 @@ public class Learner {
throws IOException, InterruptedException, X509Exception {
this.sock = createSocket();
- int initLimitTime = self.tickTime * self.initLimit;
- int remainingInitLimitTime;
+ // leader connection timeout defaults to tickTime * initLimit
+ int connectTimeout = self.tickTime * self.initLimit;
+
+ // but if connectToLearnerMasterLimit is specified, use that value to calculate
+ // timeout instead of using the initLimit value
+ if (self.connectToLearnerMasterLimit > 0) {
+ connectTimeout = self.tickTime * self.connectToLearnerMasterLimit;
+ }
+
+ int remainingTimeout;
long startNanoTime = nanoTime();
for (int tries = 0; tries < 5; tries++) {
try {
// recalculate the init limit time because retries sleep for 1000 milliseconds
- remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
- if (remainingInitLimitTime <= 0) {
- LOG.error("initLimit exceeded on retries.");
- throw new IOException("initLimit exceeded on retries.");
+ remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
+ if (remainingTimeout <= 0) {
+ LOG.error("connectToLeader exceeded on retries.");
+ throw new IOException("connectToLeader exceeded on retries.");
}
-
- sockConnect(sock, addr, Math.min(self.tickTime * self.syncLimit, remainingInitLimitTime));
+
+ sockConnect(sock, addr, Math.min(connectTimeout, remainingTimeout));
if (self.isSslQuorum()) {
((SSLSocket) sock).startHandshake();
}
sock.setTcpNoDelay(nodelay);
break;
} catch (IOException e) {
- remainingInitLimitTime = initLimitTime - (int)((nanoTime() - startNanoTime) / 1000000);
+ remainingTimeout = connectTimeout - (int)((nanoTime() - startNanoTime) / 1000000);
- if (remainingInitLimitTime <= 1000) {
- LOG.error("Unexpected exception, initLimit exceeded. tries=" + tries +
- ", remaining init limit=" + remainingInitLimitTime +
+ if (remainingTimeout <= 1000) {
+ LOG.error("Unexpected exception, connectToLeader exceeded. tries=" + tries +
+ ", remaining init limit=" + remainingTimeout +
", connecting to " + addr,e);
throw e;
} else if (tries >= 4) {
LOG.error("Unexpected exception, retries exceeded. tries=" + tries +
- ", remaining init limit=" + remainingInitLimitTime +
+ ", remaining init limit=" + remainingTimeout +
", connecting to " + addr,e);
throw e;
} else {
LOG.warn("Unexpected exception, tries=" + tries +
- ", remaining init limit=" + remainingInitLimitTime +
+ ", remaining init limit=" + remainingTimeout +
", connecting to " + addr,e);
this.sock = createSocket();
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
index f3217af..062f259 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java
@@ -587,6 +587,11 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
protected volatile int syncLimit;
/**
+ * The number of ticks that can pass before retrying to connect to learner master
+ */
+ protected volatile int connectToLearnerMasterLimit;
+
+ /**
* Enables/Disables sync request processor. This option is enabled
* by default and is to be used with observers.
*/
@@ -899,16 +904,16 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
File dataLogDir, int electionType,
- long myid, int tickTime, int initLimit, int syncLimit,
+ long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
ServerCnxnFactory cnxnFactory) throws IOException {
this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
- initLimit, syncLimit, false, cnxnFactory,
+ initLimit, syncLimit, connectToLearnerMasterLimit, false, cnxnFactory,
new QuorumMaj(quorumPeers));
}
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
File dataLogDir, int electionType,
- long myid, int tickTime, int initLimit, int syncLimit,
+ long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
boolean quorumListenOnAllIPs,
ServerCnxnFactory cnxnFactory,
QuorumVerifier quorumConfig) throws IOException {
@@ -919,6 +924,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
this.tickTime = tickTime;
this.initLimit = initLimit;
this.syncLimit = syncLimit;
+ this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
this.quorumListenOnAllIPs = quorumListenOnAllIPs;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
this.zkDb = new ZKDatabase(this.logFactory);
@@ -1049,7 +1055,7 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
}
return count;
}
-
+
/**
* This constructor is only used by the existing unit test code.
@@ -1057,10 +1063,10 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
File logDir, int clientPort, int electionAlg,
- long myid, int tickTime, int initLimit, int syncLimit)
+ long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
throws IOException
{
- this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+ this(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
new QuorumMaj(quorumPeers));
}
@@ -1071,12 +1077,12 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
*/
public QuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
File logDir, int clientPort, int electionAlg,
- long myid, int tickTime, int initLimit, int syncLimit,
+ long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit,
QuorumVerifier quorumConfig)
throws IOException
{
this(quorumPeers, snapDir, logDir, electionAlg,
- myid,tickTime, initLimit,syncLimit, false,
+ myid,tickTime, initLimit,syncLimit, connectToLearnerMasterLimit, false,
ServerCnxnFactory.createFactory(getClientAddress(quorumPeers, myid, clientPort), -1),
quorumConfig);
}
@@ -1785,6 +1791,20 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
this.syncLimit = syncLimit;
}
+ /**
+ * Get the connectToLearnerMasterLimit
+ */
+ public int getConnectToLearnerMasterLimit() {
+ return connectToLearnerMasterLimit;
+ }
+
+ /**
+ * Set the connectToLearnerMasterLimit
+ */
+ public void setConnectToLearnerMasterLimit(int connectToLearnerMasterLimit) {
+ LOG.info("connectToLearnerMasterLimit set to " + connectToLearnerMasterLimit);
+ this.connectToLearnerMasterLimit = connectToLearnerMasterLimit;
+ }
/**
* The syncEnabled can also be set via a system property.
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
index b0d2800..b1bce11 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
@@ -94,6 +94,7 @@ public class QuorumPeerConfig {
protected int initLimit;
protected int syncLimit;
+ protected int connectToLearnerMasterLimit;
protected int electionAlg = 3;
protected int electionPort = 2182;
protected boolean quorumListenOnAllIPs = false;
@@ -306,6 +307,8 @@ public class QuorumPeerConfig {
initLimit = Integer.parseInt(value);
} else if (key.equals("syncLimit")) {
syncLimit = Integer.parseInt(value);
+ } else if (key.equals("connectToLearnerMasterLimit")) {
+ connectToLearnerMasterLimit = Integer.parseInt(value);
} else if (key.equals("electionAlg")) {
electionAlg = Integer.parseInt(value);
if (electionAlg != 1 && electionAlg != 2 && electionAlg != 3) {
@@ -834,6 +837,7 @@ public class QuorumPeerConfig {
public int getInitLimit() { return initLimit; }
public int getSyncLimit() { return syncLimit; }
+ public int getConnectToLearnerMasterLimit() { return connectToLearnerMasterLimit; }
public int getElectionAlg() { return electionAlg; }
public int getElectionPort() { return electionPort; }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
index 79293ea..3be0449 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
@@ -189,6 +189,7 @@ public class QuorumPeerMain {
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
+ quorumPeer.setConnectToLearnerMasterLimit(config.getConnectToLearnerMasterLimit());
quorumPeer.setObserverMasterPort(config.getObserverMasterPort());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setClientPortListenBacklog(config.getClientPortListenBacklog());
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
index d3a631b..878e41b 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/CnxManagerTest.java
@@ -118,7 +118,7 @@ public class CnxManagerTest extends ZKTestCase {
public void run(){
try {
- QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 1000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -162,7 +162,7 @@ public class CnxManagerTest extends ZKTestCase {
thread.start();
- QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -209,7 +209,7 @@ public class CnxManagerTest extends ZKTestCase {
new InetSocketAddress(deadAddress, PortAssignment.unique())));
peerTmpdir[2] = ClientBase.createTmpDir();
- QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -237,7 +237,7 @@ public class CnxManagerTest extends ZKTestCase {
*/
@Test
public void testCnxManagerSpinLock() throws Exception {
- QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 1000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -302,7 +302,7 @@ public class CnxManagerTest extends ZKTestCase {
// the connecting peer (id = 2) is a 3.4.6 observer
peers.get(2L).type = LearnerType.OBSERVER;
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1],
- peerClientPort[1], 3, 1, 1000, 2, 2);
+ peerClientPort[1], 3, 1, 1000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if (listener != null) {
@@ -349,7 +349,7 @@ public class CnxManagerTest extends ZKTestCase {
*/
@Test
public void testSocketTimeout() throws Exception {
- QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2, 2);
QuorumCnxManager cnxManager = peer.createCnxnManager();
QuorumCnxManager.Listener listener = cnxManager.listener;
if(listener != null){
@@ -434,7 +434,7 @@ public class CnxManagerTest extends ZKTestCase {
};
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0],
- peerClientPort[0], 3, 0, 2000, 2, 2) {
+ peerClientPort[0], 3, 0, 2000, 2, 2, 2) {
@Override
public QuorumX509Util createX509Util() {
return mockedX509Util;
@@ -457,7 +457,7 @@ public class CnxManagerTest extends ZKTestCase {
for (int sid = 0; sid < 3; sid++) {
QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid],
peerTmpdir[sid], peerClientPort[sid], 3, sid, 1000, 2,
- 2);
+ 2, 2);
LOG.info("Starting peer {}", peer.getId());
peer.start();
peerList.add(sid, peer);
@@ -477,7 +477,7 @@ public class CnxManagerTest extends ZKTestCase {
// Restart halted node and verify count
peer = new QuorumPeer(peers, peerTmpdir[myid],
peerTmpdir[myid], peerClientPort[myid], 3, myid,
- 1000, 2, 2);
+ 1000, 2, 2, 2);
LOG.info("Round {}, restarting peer ",
new Object[] { i, peer.getId() });
peer.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
index 2ccbc77..09adbc4 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java
@@ -106,7 +106,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
/*
* Start server 0
*/
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
peer.startLeaderElection();
FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
thread.start();
@@ -114,7 +114,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
/*
* Start mock server 1
*/
- QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+ QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
cnxManagers[0] = mockPeer.createCnxnManager();
cnxManagers[0].listener.start();
@@ -123,7 +123,7 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
/*
* Start mock server 2
*/
- mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
+ mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2, 2);
cnxManagers[1] = mockPeer.createCnxnManager();
cnxManagers[1].listener.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
index 6583f90..ad2b180 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLELostMessageTest.java
@@ -78,7 +78,7 @@ public class FLELostMessageTest extends ZKTestCase {
/*
* Start server 0
*/
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2, 2);
peer.startLeaderElection();
FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
thread.start();
@@ -94,7 +94,7 @@ public class FLELostMessageTest extends ZKTestCase {
}
void mockServer() throws InterruptedException, IOException {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
cnxManager = peer.createCnxnManager();
cnxManager.listener.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
index 455d04f..d856b79 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/FLEOutOfElectionTest.java
@@ -52,7 +52,7 @@ public class FLEOutOfElectionTest {
new InetSocketAddress("127.0.0.1", PortAssignment.unique())));
}
QuorumPeer peer = new QuorumPeer(peers, tmpdir, tmpdir,
- PortAssignment.unique(), 3, 3, 1000, 2, 2);
+ PortAssignment.unique(), 3, 3, 1000, 2, 2, 2);
fle = new FastLeaderElection(peer, peer.createCnxnManager());
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
index 85295f8..7eccf60 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerTest.java
@@ -140,6 +140,28 @@ public class LearnerTest extends ZKTestCase {
}
@Test
+ public void connectToLearnerMasterLimitTest() throws Exception {
+ TimeoutLearner learner = new TimeoutLearner();
+ learner.self = new QuorumPeer();
+ learner.self.setTickTime(2000);
+ learner.self.setInitLimit(2);
+ learner.self.setSyncLimit(2);
+ learner.self.setConnectToLearnerMasterLimit(5);
+
+ InetSocketAddress addr = new InetSocketAddress(1111);
+ learner.setTimeMultiplier((long)4000 * 1000000);
+ learner.setPassConnectAttempt(5);
+
+ try {
+ learner.connectToLeader(addr, "");
+ Assert.fail("should have thrown IOException!");
+ } catch (IOException e) {
+ Assert.assertTrue(learner.nanoTime() > 2000*5*1000000);
+ Assert.assertEquals(3, learner.getSockConnectAttempt());
+ }
+ }
+
+ @Test
public void syncTest() throws Exception {
File tmpFile = File.createTempFile("test", ".dir", testData);
tmpFile.delete();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
index 43ed24b..d6b3774 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTest.java
@@ -39,6 +39,7 @@ public class QuorumPeerTest {
private int tickTime = 2000;
private int initLimit = 3;
private int syncLimit = 3;
+ private int connectToLearnerMasterLimit = 3;
/**
* Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2301
@@ -60,7 +61,7 @@ public class QuorumPeerTest {
* QuorumPeer constructor without QuorumVerifier
*/
QuorumPeer peer1 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
- initLimit, syncLimit);
+ initLimit, syncLimit, connectToLearnerMasterLimit);
String hostString1 = peer1.cnxnFactory.getLocalAddress().getHostString();
assertEquals(clientIP.getHostAddress(), hostString1);
@@ -77,7 +78,7 @@ public class QuorumPeerTest {
new InetSocketAddress(clientIP, PortAssignment.unique()),
new InetSocketAddress(clientIP, clientPort), LearnerType.PARTICIPANT));
QuorumPeer peer2 = new QuorumPeer(peersView, dataDir, dataDir, clientPort, electionAlg, myId, tickTime,
- initLimit, syncLimit);
+ initLimit, syncLimit, connectToLearnerMasterLimit);
String hostString2 = peer2.cnxnFactory.getLocalAddress().getHostString();
assertEquals(clientIP.getHostAddress(), hostString2);
// cleanup
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 1a1c796..456a3ef 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -126,6 +126,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
fwriter.write("tickTime=" + tickTime + "\n");
fwriter.write("initLimit=10\n");
fwriter.write("syncLimit=5\n");
+ fwriter.write("connectToLearnerMasterLimit=5\n");
tmpDir = new File(baseDir, "data");
if (!tmpDir.mkdir()) {
@@ -229,6 +230,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
fwriter.write("tickTime=4000\n");
fwriter.write("initLimit=10\n");
fwriter.write("syncLimit=5\n");
+ fwriter.write("connectToLearnerMasterLimit=5\n");
if(configs != null){
fwriter.write(configs);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
index f350abf..4d07ffa 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReconfigDuringLeaderSyncTest.java
@@ -208,9 +208,9 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
private boolean newLeaderMessage = false;
public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
- int electionAlg, long myid, int tickTime, int initLimit, int syncLimit)
+ int electionAlg, long myid, int tickTime, int initLimit, int syncLimit, int connectToLearnerMasterLimit)
throws IOException {
- super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+ super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, false,
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
}
@@ -256,7 +256,8 @@ public class ReconfigDuringLeaderSyncTest extends QuorumPeerTestBase {
throws IOException, AdminServerException {
quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
- config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+ config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit(),
+ config.getConnectToLearnerMasterLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.start();
try {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
index 8bf365f..2766c29 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLENewEpochTest.java
@@ -165,7 +165,7 @@ public class FLENewEpochTest extends ZKTestCase {
}
for(int i = 1; i < count; i++) {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
peer.startLeaderElection();
LEThread thread = new LEThread(peer, i);
thread.start();
@@ -174,7 +174,7 @@ public class FLENewEpochTest extends ZKTestCase {
if(!start0.tryAcquire(4000, java.util.concurrent.TimeUnit.MILLISECONDS))
Assert.fail("First leader election failed");
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2, 2);
peer.startLeaderElection();
LEThread thread = new LEThread(peer, 0);
thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
index bc43775..4e252de 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEPredicateTest.java
@@ -75,7 +75,7 @@ public class FLEPredicateTest extends ZKTestCase {
try{
File tmpDir = ClientBase.createTmpDir();
QuorumPeer peer = new QuorumPeer(peers, tmpDir, tmpDir,
- PortAssignment.unique(), 3, 0, 1000, 2, 2);
+ PortAssignment.unique(), 3, 0, 1000, 2, 2, 2);
MockFLE mock = new MockFLE(peer);
mock.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
index b77b93b..f930391 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLERestartTest.java
@@ -123,7 +123,7 @@ public class FLERestartTest extends ZKTestCase {
QuorumBase.shutdown(peer);
((FastLeaderElection) restartThreads.get(i).peer.getElectionAlg()).shutdown();
- peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+ peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
peer.startLeaderElection();
peerRound++;
} else {
@@ -171,7 +171,7 @@ public class FLERestartTest extends ZKTestCase {
}
for(int i = 0; i < count; i++) {
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2);
peer.startLeaderElection();
FLERestartThread thread = new FLERestartThread(peer, i);
thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
index 859c4a4..f6dc513 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLETest.java
@@ -324,7 +324,7 @@ public class FLETest extends ZKTestCase {
*/
for(int i = 0; i < count; i++) {
QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
- port[i], 3, i, 1000, 2, 2);
+ port[i], 3, i, 1000, 2, 2, 2);
peer.startLeaderElection();
LEThread thread = new LEThread(this, peer, i, rounds, quora);
thread.start();
@@ -429,7 +429,7 @@ public class FLETest extends ZKTestCase {
// start 2 peers and verify if they form the cluster
for (sid = 0; sid < 2; sid++) {
peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
- port[sid], 3, sid, 2000, 2, 2);
+ port[sid], 3, sid, 2000, 2, 2, 2);
LOG.info("Starting peer " + peer.getId());
peer.start();
peerList.add(sid, peer);
@@ -443,7 +443,7 @@ public class FLETest extends ZKTestCase {
!v1.isSuccess());
// Start 3rd peer and check if it goes in LEADING state
peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
- port[sid], 3, sid, 2000, 2, 2);
+ port[sid], 3, sid, 2000, 2, 2, 2);
LOG.info("Starting peer " + peer.getId());
peer.start();
peerList.add(sid, peer);
@@ -488,7 +488,7 @@ public class FLETest extends ZKTestCase {
// start 2 peers and verify if they form the cluster
for (sid = 0; sid < 2; sid++) {
peer = new QuorumPeer(peers, tmpdir[sid], tmpdir[sid],
- port[sid], 3, sid, 2000, 2, 2);
+ port[sid], 3, sid, 2000, 2, 2, 2);
LOG.info("Starting peer " + peer.getId());
peer.start();
peerList.add(sid, peer);
@@ -510,7 +510,7 @@ public class FLETest extends ZKTestCase {
peer.setCurrentVote(newVote);
// Start 3rd peer and check if it joins the quorum
peer = new QuorumPeer(peers, tmpdir[2], tmpdir[2],
- port[2], 3, 2, 2000, 2, 2);
+ port[2], 3, 2, 2000, 2, 2, 2);
LOG.info("Starting peer " + peer.getId());
peer.start();
peerList.add(sid, peer);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
index 351b008..65d436d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/FLEZeroWeightTest.java
@@ -154,7 +154,7 @@ public class FLEZeroWeightTest extends ZKTestCase {
for(int i = 0; i < count; i++) {
QuorumHierarchical hq = new QuorumHierarchical(qp);
- QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq);
+ QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, 2, hq);
peer.startLeaderElection();
LEThread thread = new LEThread(peer, i);
thread.start();
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
index a1dc6c5..bc746e5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/HierarchicalQuorumTest.java
@@ -145,6 +145,7 @@ public class HierarchicalQuorumTest extends ClientBase {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
HashMap<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
peers.put(Long.valueOf(1), new QuorumServer(1,
new InetSocketAddress("127.0.0.1", port1),
@@ -178,22 +179,22 @@ public class HierarchicalQuorumTest extends ClientBase {
qp.setProperty("server.5", "127.0.0.1:" + port5 + ":" + leport5 + ":observer" + ";" + clientport5);
}
QuorumHierarchical hq1 = new QuorumHierarchical(qp);
- s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, hq1);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, clientport1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq1);
Assert.assertEquals(clientport1, s1.getClientPort());
LOG.info("creating QuorumPeer 2 port " + clientport2);
QuorumHierarchical hq2 = new QuorumHierarchical(qp);
- s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, hq2);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, clientport2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq2);
Assert.assertEquals(clientport2, s2.getClientPort());
LOG.info("creating QuorumPeer 3 port " + clientport3);
QuorumHierarchical hq3 = new QuorumHierarchical(qp);
- s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, hq3);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, clientport3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq3);
Assert.assertEquals(clientport3, s3.getClientPort());
LOG.info("creating QuorumPeer 4 port " + clientport4);
QuorumHierarchical hq4 = new QuorumHierarchical(qp);
- s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, hq4);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, clientport4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq4);
if (withObservers) {
s4.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
}
@@ -201,7 +202,7 @@ public class HierarchicalQuorumTest extends ClientBase {
LOG.info("creating QuorumPeer 5 port " + clientport5);
QuorumHierarchical hq5 = new QuorumHierarchical(qp);
- s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, hq5);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, clientport5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit, hq5);
if (withObservers) {
s5.setLearnerType(QuorumPeer.LearnerType.OBSERVER);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
index fcaa9b6..a2b0615 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumBase.java
@@ -137,6 +137,7 @@ public class QuorumBase extends ClientBase {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
Map<Long,QuorumServer> peers = new HashMap<Long,QuorumServer>();
peers.put(Long.valueOf(1), new QuorumServer(1,
new InetSocketAddress(LOCALADDR, port1),
@@ -170,19 +171,19 @@ public class QuorumBase extends ClientBase {
}
LOG.info("creating QuorumPeer 1 port " + portClient1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient1, s1.getClientPort());
LOG.info("creating QuorumPeer 2 port " + portClient2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient2, s2.getClientPort());
LOG.info("creating QuorumPeer 3 port " + portClient3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient3, s3.getClientPort());
LOG.info("creating QuorumPeer 4 port " + portClient4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient4, s4.getClientPort());
LOG.info("creating QuorumPeer 5 port " + portClient5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient5, s5.getClientPort());
if (withObservers) {
@@ -299,6 +300,7 @@ public class QuorumBase extends ClientBase {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
if(peers == null){
peers = new HashMap<Long,QuorumServer>();
@@ -333,27 +335,27 @@ public class QuorumBase extends ClientBase {
switch(i){
case 1:
LOG.info("creating QuorumPeer 1 port " + portClient1);
- s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit);
+ s1 = new QuorumPeer(peers, s1dir, s1dir, portClient1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient1, s1.getClientPort());
break;
case 2:
LOG.info("creating QuorumPeer 2 port " + portClient2);
- s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit);
+ s2 = new QuorumPeer(peers, s2dir, s2dir, portClient2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient2, s2.getClientPort());
break;
case 3:
LOG.info("creating QuorumPeer 3 port " + portClient3);
- s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit);
+ s3 = new QuorumPeer(peers, s3dir, s3dir, portClient3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient3, s3.getClientPort());
break;
case 4:
LOG.info("creating QuorumPeer 4 port " + portClient4);
- s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit);
+ s4 = new QuorumPeer(peers, s4dir, s4dir, portClient4, 3, 4, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient4, s4.getClientPort());
break;
case 5:
LOG.info("creating QuorumPeer 5 port " + portClient5);
- s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit);
+ s5 = new QuorumPeer(peers, s5dir, s5dir, portClient5, 3, 5, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(portClient5, s5.getClientPort());
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
index 314171d..6d711fc 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumUtil.java
@@ -73,6 +73,8 @@ public class QuorumUtil {
private int initLimit;
private int syncLimit;
+
+ private int connectToLearnerMasterLimit;
private int electionAlg;
@@ -94,6 +96,7 @@ public class QuorumUtil {
tickTime = 2000;
initLimit = 3;
this.syncLimit = syncLimit;
+ connectToLearnerMasterLimit = 3;
electionAlg = 3;
hostPort = "";
@@ -115,7 +118,7 @@ public class QuorumUtil {
PeerStruct ps = peers.get(i);
LOG.info("Creating QuorumPeer " + i + "; public port " + ps.clientPort);
ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort,
- electionAlg, ps.id, tickTime, initLimit, syncLimit);
+ electionAlg, ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
}
} catch (Exception e) {
@@ -202,7 +205,7 @@ public class QuorumUtil {
PeerStruct ps = getPeer(id);
LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
- ps.id, tickTime, initLimit, syncLimit);
+ ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
if (localSessionEnabled) {
ps.peer.enableLocalSessions(true);
}
@@ -221,7 +224,7 @@ public class QuorumUtil {
PeerStruct ps = getPeer(id);
LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
- ps.id, tickTime, initLimit, syncLimit);
+ ps.id, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
if (localSessionEnabled) {
ps.peer.enableLocalSessions(true);
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
index d7d5ffa..2290a34 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/TruncateTest.java
@@ -185,6 +185,7 @@ public class TruncateTest extends ZKTestCase {
int tickTime = 2000;
int initLimit = 3;
int syncLimit = 3;
+ int connectToLearnerMasterLimit = 3;
int port1 = PortAssignment.unique();
int port2 = PortAssignment.unique();
@@ -205,9 +206,9 @@ public class TruncateTest extends ZKTestCase {
new InetSocketAddress("127.0.0.1", PortAssignment.unique()),
new InetSocketAddress("127.0.0.1", port3)));
- QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit);
+ QuorumPeer s2 = new QuorumPeer(peers, dataDir2, dataDir2, port2, 3, 2, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
s2.start();
- QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit);
+ QuorumPeer s3 = new QuorumPeer(peers, dataDir3, dataDir3, port3, 3, 3, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
s3.start();
zk = ClientBase.createZKClient("127.0.0.1:" + port2, 15000);
@@ -223,7 +224,7 @@ public class TruncateTest extends ZKTestCase {
} catch(KeeperException.NoNodeException e) {
// this is what we want
}
- QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit);
+ QuorumPeer s1 = new QuorumPeer(peers, dataDir1, dataDir1, port1, 3, 1, tickTime, initLimit, syncLimit, connectToLearnerMasterLimit);
s1.start();
ZooKeeper zk1 = ClientBase.createZKClient("127.0.0.1:" + port1, 15000);
zk1.getData("/9", false, new Stat());