You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by fp...@apache.org on 2014/07/18 20:48:57 UTC
svn commit: r1611765 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
Author: fpj
Date: Fri Jul 18 18:48:56 2014
New Revision: 1611765
URL: http://svn.apache.org/r1611765
Log:
ZOOKEEPER-1807. Observers spam each other creating connections to the election addr (Alex Shraer via fpj)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Fri Jul 18 18:48:56 2014
@@ -708,6 +708,9 @@ BUGFIXES:
ZOOKEEPER-1851. Follower and Observer Request Processors Do Not Forward
create2 Requests (Chris Chen via rakeshr)
+ ZOOKEEPER-1807. Observers spam each other creating connections to the
+ election addr (Alex Shraer via fpj)
+
IMPROVEMENTS:
ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Fri Jul 18 18:48:56 2014
@@ -315,7 +315,7 @@ public class FastLeaderElection implemen
* If it is from a non-voting server (such as an observer or
* a non-voting follower), respond right away.
*/
- if(!self.getVotingView().containsKey(response.sid)){
+ if(!self.getCurrentAndNextConfigVoters().contains(response.sid)) {
Vote current = self.getCurrentVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
@@ -658,7 +658,7 @@ public class FastLeaderElection implemen
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
- for (long sid : self.getAllKnownServerIds()) {
+ for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
@@ -718,31 +718,36 @@ public class FastLeaderElection implemen
}
/**
- * Termination predicate. Given a set of votes, determines if
- * have sufficient to declare the end of the election round.
- *
- * @param votes Set of votes
- * @param vote Identifier of the vote received last
- */
- private boolean termPredicate(
- HashMap<Long, Vote> votes,
- Vote vote) {
-
- HashSet<Long> set = new HashSet<Long>();
+ * Termination predicate. Given a set of votes, determines if have
+ * sufficient to declare the end of the election round.
+ *
+ * @param votes
+ * Set of votes
+ * @param vote
+ * Identifier of the vote received last
+ */
+ private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
+ SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
+ voteSet.addQuorumVerifier(self.getQuorumVerifier());
+ if (self.getLastSeenQuorumVerifier() != null
+ && self.getLastSeenQuorumVerifier().getVersion() > self
+ .getQuorumVerifier().getVersion()) {
+ voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+ }
/*
- * First make the views consistent. Sometimes peers will have
- * different zxids for a server depending on timing.
+ * First make the views consistent. Sometimes peers will have different
+ * zxids for a server depending on timing.
*/
- for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
- if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
- && vote.equals(entry.getValue())){
- set.add(entry.getKey());
+ for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
+ if (vote.equals(entry.getValue())) {
+ voteSet.addAck(entry.getKey());
}
}
- return self.getQuorumVerifier().containsQuorum(set);
+ return voteSet.hasAllQuorums();
}
+
/**
* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
@@ -914,10 +919,10 @@ public class FastLeaderElection implemen
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
- }
- else if(self.getVotingView().containsKey(n.sid)) {
+ }
+ else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
/*
- * Only proceed if the vote comes from a replica in the
+ * Only proceed if the vote comes from a replica in the current or next
* voting view.
*/
switch (n.state) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Fri Jul 18 18:48:56 2014
@@ -1107,12 +1107,14 @@ public class QuorumPeer extends ZooKeepe
return getQuorumVerifier().getObservingMembers();
}
- public synchronized Set<Long> getAllKnownServerIds(){
- Set<Long> tmp = new HashSet<Long>(getQuorumVerifier().getAllMembers().keySet());
- if (getLastSeenQuorumVerifier()!=null) {
- tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet());
- }
- return tmp;
+ public synchronized Set<Long> getCurrentAndNextConfigVoters() {
+ Set<Long> voterIds = new HashSet<Long>(getQuorumVerifier()
+ .getVotingMembers().keySet());
+ if (getLastSeenQuorumVerifier() != null) {
+ voterIds.addAll(getLastSeenQuorumVerifier().getVotingMembers()
+ .keySet());
+ }
+ return voterIds;
}
/**
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Fri Jul 18 18:48:56 2014
@@ -55,21 +55,6 @@ public class QuorumPeerTestBase extends
}
}
- public static class MainThreadReconfigRecovery extends MainThread {
- final File nextDynamicConfigFile;
-
- public MainThreadReconfigRecovery(int myid, int clientPort,
- String currentQuorumCfgSection, String nextQuorumCfgSection)
- throws IOException {
- super(myid, clientPort, currentQuorumCfgSection);
- nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
- FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
- fwriter.write(nextQuorumCfgSection + "\n");
- fwriter.flush();
- fwriter.close();
- }
- }
-
public static class MainThread implements Runnable {
final File confFile;
final File dynamicConfigFile;
@@ -141,7 +126,16 @@ public class QuorumPeerTestBase extends
fwriter.flush();
fwriter.close();
}
-
+
+ public void writeTempDynamicConfigFile(String nextQuorumCfgSection)
+ throws IOException {
+ File nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
+ FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
+ fwriter.write(nextQuorumCfgSection + "\n");
+ fwriter.flush();
+ fwriter.close();
+ }
+
Thread currentThread;
synchronized public void start() {
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java?rev=1611765&r1=1611764&r2=1611765&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Fri Jul 18 18:48:56 2014
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quor
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
import java.util.ArrayList;
+import java.util.HashSet;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.ZooKeeper;
@@ -31,13 +32,13 @@ import org.junit.Test;
public class ReconfigRecoveryTest extends QuorumPeerTestBase {
/**
- * Reconfiguration recovery - test that a reconfiguration is completed
- * if leader has .next file during startup and new config is not running yet
+ * Reconfiguration recovery - test that a reconfiguration is completed if
+ * leader has .next file during startup and new config is not running yet
*/
@Test
public void testNextConfigCompletion() throws Exception {
ClientBase.setupTestEnv();
-
+
// 2 servers in current config, 3 in next config
final int SERVER_COUNT = 3;
final int clientPorts[] = new int[SERVER_COUNT];
@@ -46,96 +47,84 @@ public class ReconfigRecoveryTest extend
ArrayList<String> allServers = new ArrayList<String>();
String currentQuorumCfgSection = null, nextQuorumCfgSection;
-
- for(int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
- ":participant;localhost:" + clientPorts[i];
- allServers.add(server);
- sb.append(server +"\n");
- if (i == 1) currentQuorumCfgSection = sb.toString();
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
+ + clientPorts[i];
+ allServers.add(server);
+ sb.append(server + "\n");
+ if (i == 1)
+ currentQuorumCfgSection = sb.toString() + "version=100000000\n";
}
sb.append("version=200000000\n"); // version of current config is 100000000
nextQuorumCfgSection = sb.toString();
-
+
// Both servers 0 and 1 will have the .next config file, which means
// for them that a reconfiguration was in progress when they failed
// and the leader will complete it
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
for (int i = 0; i < SERVER_COUNT - 1; i++) {
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+ // note that we should run the server, shut it down and only then
+ // simulate a reconfig in progress by writing the temp file, but here no
+ // other server is competing with them in FLE, so we can skip this step
+ // (server 2 is booted after FLE ends)
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
-
- Assert.assertTrue("waiting for server 0 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
+
+ Assert.assertTrue("waiting for server 0 being up", ClientBase
+ .waitForServerUp("127.0.0.1:" + clientPorts[0],
CONNECTION_TIMEOUT));
- Assert.assertTrue("waiting for server 1 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
+ Assert.assertTrue("waiting for server 1 being up", ClientBase
+ .waitForServerUp("127.0.0.1:" + clientPorts[1],
CONNECTION_TIMEOUT));
-
- int leader = mt[0].main.quorumPeer.leader == null ? 1: 0;
-
+
+ int leader = mt[0].main.quorumPeer.leader == null ? 1 : 0;
+
// the new server's config is going to include itself and the current leader
sb = new StringBuilder();
sb.append(allServers.get(leader) + "\n");
sb.append(allServers.get(2) + "\n");
-
+
// suppose that this new server never heard about the reconfig proposal
String newServerInitialConfig = sb.toString();
mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig);
mt[2].start();
- zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2],
+ ClientBase.CONNECTION_TIMEOUT, this);
- Assert.assertTrue("waiting for server 2 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+ Assert.assertTrue("waiting for server 2 being up", ClientBase
+ .waitForServerUp("127.0.0.1:" + clientPorts[2],
CONNECTION_TIMEOUT));
-
- ReconfigTest.testServerHasConfig(zk[0], allServers, null);
- ReconfigTest.testServerHasConfig(zk[1], allServers, null);
- ReconfigTest.testServerHasConfig(zk[2], allServers, null);
-
- ReconfigTest.testNormalOperation(zk[0], zk[2]);
- ReconfigTest.testNormalOperation(zk[2], zk[1]);
-
- zk[2].close();
- mt[2].shutdown();
-
- //now suppose that the new server heard the reconfig request
- mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection);
- mt[2].start();
- zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
- Assert.assertTrue("waiting for server 2 being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
- CONNECTION_TIMEOUT));
-
ReconfigTest.testServerHasConfig(zk[0], allServers, null);
ReconfigTest.testServerHasConfig(zk[1], allServers, null);
ReconfigTest.testServerHasConfig(zk[2], allServers, null);
-
+
ReconfigTest.testNormalOperation(zk[0], zk[2]);
ReconfigTest.testNormalOperation(zk[2], zk[1]);
for (int i = 0; i < SERVER_COUNT; i++) {
- zk[i].close();
- }
- for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
+ zk[i].close();
}
}
/**
* Reconfiguration recovery - current config servers discover .next file,
- * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet.
- * Should complete reconfiguration.
+ * but they're both observers and their ports change in next config. Suppose
+ * that next config wasn't activated yet. Should complete reconfiguration.
*/
@Test
public void testCurrentServersAreObserversInNextConfig() throws Exception {
ClientBase.setupTestEnv();
-
+
// 2 servers in current config, 5 in next config
final int SERVER_COUNT = 5;
final int clientPorts[] = new int[SERVER_COUNT];
@@ -143,82 +132,118 @@ public class ReconfigRecoveryTest extend
StringBuilder sb = new StringBuilder();
String server;
- String currentQuorumCfgSection = null, nextQuorumCfgSection;
-
+ String currentQuorumCfg = null, currentQuorumCfgSection = null, nextQuorumCfgSection = null;
+
ArrayList<String> allServersCurrent = new ArrayList<String>();
ArrayList<String> allServersNext = new ArrayList<String>();
-
-
- for(int i = 0; i < 2; i++) {
- oldClientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
- ":participant;localhost:" + oldClientPorts[i];
- allServersCurrent.add(server);
- sb.append(server +"\n");
+
+ for (int i = 0; i < 2; i++) {
+ oldClientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
+ + oldClientPorts[i];
+ allServersCurrent.add(server);
+ sb.append(server + "\n");
}
-
+
+ currentQuorumCfg = sb.toString();
+ sb.append("version=100000000\n");
currentQuorumCfgSection = sb.toString();
+
sb = new StringBuilder();
String role;
- for (int i=0; i<SERVER_COUNT; i++) {
+ for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
- if (i < 2) role = "observer";
- else role = "participant";
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
- ":" + role + ";localhost:" + clientPorts[i];
+ if (i < 2) {
+ role = "observer";
+ } else {
+ role = "participant";
+ }
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":" + role
+ + ";localhost:" + clientPorts[i];
allServersNext.add(server);
- sb.append(server +"\n");
-
+ sb.append(server + "\n");
}
sb.append("version=200000000\n"); // version of current config is 100000000
nextQuorumCfgSection = sb.toString();
-
+
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+ // run servers 0 and 1 normally
for (int i = 0; i < 2; i++) {
- mt[i] = new MainThreadReconfigRecovery(i, oldClientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i] = new MainThread(i, oldClientPorts[i],
+ currentQuorumCfgSection);
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + oldClientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp(
+ "127.0.0.1:" + oldClientPorts[i],
+ CONNECTION_TIMEOUT * 2));
+ }
+
+ ReconfigTest.testNormalOperation(zk[0], zk[1]);
+
+ // shut them down and then simulate a reboot with a reconfig in progress
+ for (int i = 0; i < 2; i++) {
+ mt[i].shutdown();
+ zk[i].close();
+ }
+
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue(
+ "waiting for server " + i + " being up",
+ ClientBase.waitForServerDown("127.0.0.1:"
+ + oldClientPorts[i], CONNECTION_TIMEOUT * 2));
+ }
+
+ for (int i = 0; i < 2; i++) {
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
// new members are initialized with current config + the new server
for (int i = 2; i < SERVER_COUNT; i++) {
- mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection + allServersNext.get(i));
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfg
+ + allServersNext.get(i));
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
- for (int i=0; i<SERVER_COUNT; i++) {
- Assert.assertTrue("waiting for server "+ i + " being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
- CONNECTION_TIMEOUT * 2));
- ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT * 2));
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
}
-
+
ReconfigTest.testNormalOperation(zk[0], zk[2]);
ReconfigTest.testNormalOperation(zk[4], zk[1]);
for (int i = 0; i < SERVER_COUNT; i++) {
zk[i].close();
- }
- for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].shutdown();
}
+ }
- }
-
-
-
/**
- * Reconfiguration recovery - test that if servers in old config have a .next file
- * but no quorum of new config is up then no progress should be possible (no progress will happen
- * to ensure safety as the new config might be actually up but partitioned from old config)
+ * Reconfiguration recovery - test that if servers in old config have a
+ * .next file but no quorum of new config is up then no progress should be
+ * possible (no progress will happen to ensure safety as the new config
+ * might be actually up but partitioned from old config)
*/
@Test
public void testNextConfigUnreachable() throws Exception {
ClientBase.setupTestEnv();
-
+
// 2 servers in current config, 5 in next config
final int SERVER_COUNT = 5;
final int clientPorts[] = new int[SERVER_COUNT];
@@ -226,57 +251,61 @@ public class ReconfigRecoveryTest extend
String server;
String currentQuorumCfgSection = null, nextQuorumCfgSection;
-
+
ArrayList<String> allServers = new ArrayList<String>();
- for(int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
- ":participant;localhost:" + clientPorts[i];
- allServers.add(server);
- sb.append(server +"\n");
- if (i == 1) currentQuorumCfgSection = sb.toString();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
+ + clientPorts[i];
+ allServers.add(server);
+ sb.append(server + "\n");
+ if (i == 1)
+ currentQuorumCfgSection = sb.toString() + "version=100000000\n";
}
- sb.append("version=200000000\n"); // version of current config is 100000000
+ sb.append("version=200000000\n"); // version of current config is 100000000
nextQuorumCfgSection = sb.toString();
-
- // lets start servers 2, 3, 4 with the new config
+
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
// Both servers 0 and 1 will have the .next config file, which means
// for them that a reconfiguration was in progress when they failed
- // and the leader will complete it.
for (int i = 0; i < 2; i++) {
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+ // note that we should run the server, shut it down and only then
+ // simulate a reconfig in progress by writing the temp file, but here no
+ // other server is competing with them in FLE, so we can skip this step
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
- Thread.sleep(CONNECTION_TIMEOUT*2);
-
- // make sure servers 0, 1 don't come online
+ Thread.sleep(CONNECTION_TIMEOUT * 2);
+
+ // make sure servers 0, 1 don't come online - this should be the case
+ // since they can't complete the reconfig
for (int i = 0; i < 2; i++) {
- Assert.assertFalse("server " + i + " is up but shouldn't be",
+ Assert.assertFalse("server " + i + " is up but shouldn't be",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
CONNECTION_TIMEOUT / 10));
}
-
+
for (int i = 0; i < 2; i++) {
zk[i].close();
- }
- for (int i = 0; i < 2; i++) {
mt[i].shutdown();
}
}
-
+
/**
- * Reconfiguration recovery - test that old config members will join the new config
- * if its already active, and not try to complete the reconfiguration
+ * Reconfiguration recovery - test that old config members will join the new
+ * config if its already active, and not try to complete the reconfiguration
*/
@Test
public void testNextConfigAlreadyActive() throws Exception {
ClientBase.setupTestEnv();
-
+
// 2 servers in current config, 5 in next config
final int SERVER_COUNT = 5;
final int clientPorts[] = new int[SERVER_COUNT];
@@ -284,75 +313,274 @@ public class ReconfigRecoveryTest extend
String server;
String currentQuorumCfgSection = null, nextQuorumCfgSection;
-
+
ArrayList<String> allServers = new ArrayList<String>();
- for(int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
- ":participant;localhost:" + clientPorts[i];
- allServers.add(server);
- sb.append(server +"\n");
- if (i == 1) currentQuorumCfgSection = sb.toString();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":participant;localhost:"
+ + clientPorts[i];
+ allServers.add(server);
+ sb.append(server + "\n");
+ if (i == 1) currentQuorumCfgSection = sb.toString() + "version=100000000\n";
}
sb.append("version=200000000\n"); // version of current config is 100000000
nextQuorumCfgSection = sb.toString();
-
+
// lets start servers 2, 3, 4 with the new config
MainThread mt[] = new MainThread[SERVER_COUNT];
ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
for (int i = 2; i < SERVER_COUNT; i++) {
mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection);
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
- for (int i = 2; i < SERVER_COUNT; i++) {
+ for (int i = 2; i < SERVER_COUNT; i++) {
Assert.assertTrue("waiting for server " + i + " being up",
- ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
- CONNECTION_TIMEOUT));
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT));
}
ReconfigTest.testNormalOperation(zk[2], zk[3]);
long epoch = mt[2].main.quorumPeer.getAcceptedEpoch();
-
+
// Both servers 0 and 1 will have the .next config file, which means
// for them that a reconfiguration was in progress when they failed
- // and the leader will complete it.
+ // and the leader will complete it.
for (int i = 0; i < 2; i++) {
- mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection);
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
mt[i].start();
- zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+ ClientBase.CONNECTION_TIMEOUT, this);
}
-
- // servers 0 and 1 should connect to all servers, including the one in their
- // .next file during startup, and will find the next config and join it
+ // servers 0 and 1 should connect to all servers, including the one in
+ // their .next file during startup, and will find the next config and join it
for (int i = 0; i < 2; i++) {
- Assert.assertTrue("waiting for server " + i + " being up",
+ Assert.assertTrue("waiting for server " + i + " being up",
ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
- CONNECTION_TIMEOUT*2));
+ CONNECTION_TIMEOUT * 2));
}
-
+
// make sure they joined the new config without any change to it
Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch());
Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch());
Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch());
-
+
ReconfigTest.testServerHasConfig(zk[0], allServers, null);
ReconfigTest.testServerHasConfig(zk[1], allServers, null);
-
+
ReconfigTest.testNormalOperation(zk[0], zk[2]);
ReconfigTest.testNormalOperation(zk[4], zk[1]);
-
for (int i = 0; i < SERVER_COUNT; i++) {
zk[i].close();
+ mt[i].shutdown();
}
+ }
+
+ /**
+ * Tests conversion of observer to participant AFTER new config was already
+ * committed. Old config: servers 0 (participant), 1 (participant), 2
+ * (observer) New config: servers 2 (participant), 3 (participant) We start
+ * server 2 with old config and start server 3 with new config. All other
+ * servers are down. In order to terminate FLE, server 3 must 'convince'
+ * server 2 to adopt the new config and turn into a participant.
+ */
+ @Test
+ public void testObserverConvertedToParticipantDuringFLE() throws Exception {
+ ClientBase.setupTestEnv();
+
+ final int SERVER_COUNT = 4;
+ int[][] ports = generatePorts(SERVER_COUNT);
+ String currentQuorumCfgSection, nextQuorumCfgSection;
+
+ // generate old config string
+ HashSet<Integer> observers = new HashSet<Integer>();
+ observers.add(2);
+ StringBuilder sb = generateConfig(3, ports, observers);
+ sb.append("version=100000000");
+ currentQuorumCfgSection = sb.toString();
+
+ // generate new config string
+ ArrayList<String> allServersNext = new ArrayList<String>();
+ sb = new StringBuilder();
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+ + ports[i][1] + ":participant;localhost:" + ports[i][2];
+ allServersNext.add(server);
+ sb.append(server + "\n");
+ }
+ sb.append("version=200000000"); // version of current config is 100000000
+ nextQuorumCfgSection = sb.toString();
+
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+ // start server 2 with old config, where it is an observer
+ mt[2] = new MainThread(2, ports[2][2], currentQuorumCfgSection);
+ mt[2].start();
+ zk[2] = new ZooKeeper("127.0.0.1:" + ports[2][2],
+ ClientBase.CONNECTION_TIMEOUT, this);
+
+ // start server 3 with new config
+ mt[3] = new MainThread(3, ports[3][2], nextQuorumCfgSection);
+ mt[3].start();
+ zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
+ ClientBase.CONNECTION_TIMEOUT, this);
+
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+ CONNECTION_TIMEOUT * 2));
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+ }
+
+ Assert.assertEquals(nextQuorumCfgSection,
+ ReconfigTest.testServerHasConfig(zk[2], null, null));
+ Assert.assertEquals(nextQuorumCfgSection,
+ ReconfigTest.testServerHasConfig(zk[3], null, null));
+ ReconfigTest.testNormalOperation(zk[2], zk[2]);
+ ReconfigTest.testNormalOperation(zk[3], zk[2]);
+
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ zk[i].close();
+ mt[i].shutdown();
+ }
+ }
+
+ /**
+ * Tests conversion of observer to participant during reconfig recovery, new
+ * config was not committed yet. Old config: servers 0 (participant), 1
+ * (participant), 2 (observer) New config: servers 2 (participant), 3
+ * (participant) We start server servers 0, 1, 2 with old config and a .next
+ * file indicating a reconfig in progress. We start server 3 with old config
+ * + itself in config file. In this scenario server 2 can't be converted to
+ * participant during reconfig since we don't gossip about proposed
+ * configurations, only about committed ones. This tests that new config can
+ * be completed, which requires server 2's ack for the newleader message,
+ * even though its an observer.
+ */
+ @Test
+ public void testCurrentObserverIsParticipantInNewConfig() throws Exception {
+ ClientBase.setupTestEnv();
+
+ final int SERVER_COUNT = 4;
+ int[][] ports = generatePorts(SERVER_COUNT);
+ String currentQuorumCfg, currentQuorumCfgSection, nextQuorumCfgSection;
+
+ // generate old config string
+ HashSet<Integer> observers = new HashSet<Integer>();
+ observers.add(2);
+
+ StringBuilder sb = generateConfig(3, ports, observers);
+ currentQuorumCfg = sb.toString();
+ sb.append("version=100000000");
+ currentQuorumCfgSection = sb.toString();
+
+ // Run servers 0..2 for a while
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+ for (int i = 0; i <= 2; i++) {
+ mt[i] = new MainThread(i, ports[i][2], currentQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+
+ for (int i = 0; i <= 2; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+ CONNECTION_TIMEOUT * 2));
+ }
+
+ // shut servers 0..2 down
+ for (int i = 0; i <= 2; i++) {
+ mt[i].shutdown();
+ zk[i].close();
+ }
+
+ // generate new config string
+ ArrayList<String> allServersNext = new ArrayList<String>();
+ sb = new StringBuilder();
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+ + ports[i][1] + ":participant;localhost:" + ports[i][2];
+ allServersNext.add(server);
+ sb.append(server + "\n");
+ }
+ sb.append("version=200000000"); // version of current config is 100000000
+ nextQuorumCfgSection = sb.toString();
+
+ // simulate reconfig in progress - servers 0..2 have a temp reconfig
+ // file when they boot
+ for (int i = 0; i <= 2; i++) {
+ mt[i].writeTempDynamicConfigFile(nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + ports[i][2],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ // new server 3 has still its invalid joiner config - everyone in old
+ // config + itself
+ mt[3] = new MainThread(3, ports[3][2], currentQuorumCfg
+ + allServersNext.get(1));
+ mt[3].start();
+ zk[3] = new ZooKeeper("127.0.0.1:" + ports[3][2],
+ ClientBase.CONNECTION_TIMEOUT, this);
+
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + ports[i][2],
+ CONNECTION_TIMEOUT * 2));
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+ }
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+ ReconfigTest.testNormalOperation(zk[3], zk[1]);
+ Assert.assertEquals(nextQuorumCfgSection,
+ ReconfigTest.testServerHasConfig(zk[2], null, null));
+ Assert.assertEquals(nextQuorumCfgSection,
+ ReconfigTest.testServerHasConfig(zk[3], null, null));
+
for (int i = 0; i < SERVER_COUNT; i++) {
+ zk[i].close();
mt[i].shutdown();
}
}
-
-
-
-}
+
+ /*
+ * Generates 3 ports per server
+ */
+ private int[][] generatePorts(int numServers) {
+ int[][] ports = new int[numServers][];
+ for (int i = 0; i < numServers; i++) {
+ ports[i] = new int[3];
+ for (int j = 0; j < 3; j++) {
+ ports[i][j] = PortAssignment.unique();
+ }
+ }
+ return ports;
+ }
+
+ /*
+ * Creates a configuration string for servers 0..numServers-1 Ids in
+ * observerIds correspond to observers, other ids are for participants.
+ */
+ private StringBuilder generateConfig(int numServers, int[][] ports,
+ HashSet<Integer> observerIds) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < numServers; i++) {
+ String server = "server." + i + "=localhost:" + ports[i][0] + ":"
+ + ports[i][1] + ":"
+ + (observerIds.contains(i) ? "observer" : "participant")
+ + ";localhost:" + ports[i][2];
+ sb.append(server + "\n");
+ }
+ return sb;
+ }
+}
\ No newline at end of file