You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC
svn commit: r1453693 [4/5] - in /zookeeper/trunk: ./ src/ src/c/
src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/cli/
src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Thu Mar 7 06:01:49 2013
@@ -176,7 +176,13 @@ public class QuorumCnxManager {
try {
// Sending id and challenge
dout = new DataOutputStream(sock.getOutputStream());
+ // represents protocol version (in other words - message type)
+ dout.writeLong(0xffff0000);
dout.writeLong(self.getId());
+ String addr = self.getElectionAddress().getHostName() + ":" + self.getElectionAddress().getPort();
+ byte[] addr_bytes = addr.getBytes();
+ dout.writeInt(addr_bytes.length);
+ dout.write(addr_bytes);
dout.flush();
} catch (IOException e) {
LOG.warn("Ignoring exception reading or writing challenge: ", e);
@@ -214,7 +220,6 @@ public class QuorumCnxManager {
}
return false;
}
-
/**
@@ -225,12 +230,33 @@ public class QuorumCnxManager {
*
*/
public boolean receiveConnection(Socket sock) {
- Long sid = null;
-
+ Long sid = null, protocolVersion = null;
+ InetSocketAddress electionAddr;
try {
- // Read server id
DataInputStream din = new DataInputStream(sock.getInputStream());
- sid = din.readLong();
+ protocolVersion = din.readLong();
+ if (protocolVersion >= 0) { // this is a server id and not a protocol version
+ sid = protocolVersion;
+ electionAddr = self.getVotingView().get(sid).electionAddr;
+ } else {
+ sid = din.readLong();
+ int num_remaining_bytes = din.readInt();
+ byte[] b = new byte[num_remaining_bytes];
+ int num_read = din.read(b);
+ if (num_read == num_remaining_bytes) {
+ if (protocolVersion == 0xffff0000) {
+ String addr = new String(b);
+ String[] host_port = addr.split(":");
+ electionAddr = new InetSocketAddress(host_port[0], Integer.parseInt(host_port[1]));
+ } else {
+ LOG.error("Got urecognized protocol version " + protocolVersion + " from " + sid);
+ electionAddr = null;
+ }
+ } else {
+ LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
+ electionAddr = null;
+ }
+ }
if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
@@ -263,7 +289,7 @@ public class QuorumCnxManager {
*/
LOG.debug("Create new connection to server: " + sid);
closeSocket(sock);
- connectOne(sid);
+ connectOne(sid, electionAddr);
// Otherwise start worker threads to receive data.
} else {
@@ -329,47 +355,75 @@ public class QuorumCnxManager {
}
/**
+ * Try to establish a connection to server with id sid using its electionAddr.
+ *
+ * @param sid server id
+ * @return boolean success indication
+ */
+ synchronized boolean connectOne(long sid, InetSocketAddress electionAddr){
+ if (senderWorkerMap.get(sid) != null) {
+ LOG.debug("There is a connection already for server " + sid);
+ return true;
+ }
+ try {
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Opening channel to server " + sid);
+ }
+ Socket sock = new Socket();
+ setSockOpts(sock);
+ sock.connect(electionAddr, cnxTO);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connected to server " + sid);
+ }
+ initiateConnection(sock, sid);
+ return true;
+ } catch (UnresolvedAddressException e) {
+ // Sun doesn't include the address that causes this
+ // exception to be thrown, also UAE cannot be wrapped cleanly
+ // so we log the exception in order to capture this critical
+ // detail.
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr, e);
+ throw e;
+ } catch (IOException e) {
+ LOG.warn("Cannot open channel to " + sid
+ + " at election address " + electionAddr,
+ e);
+ return false;
+ }
+
+ }
+
+ /**
* Try to establish a connection to server with id sid.
*
* @param sid server id
*/
synchronized void connectOne(long sid){
- if (senderWorkerMap.get(sid) == null){
- InetSocketAddress electionAddr;
+ if (senderWorkerMap.get(sid) != null) {
+ LOG.debug("There is a connection already for server " + sid);
+ return;
+ }
+ synchronized(self) {
+ boolean knownId = false;
if (self.getView().containsKey(sid)) {
- electionAddr = self.getView().get(sid).electionAddr;
- } else {
+ knownId = true;
+ if (connectOne(sid, self.getView().get(sid).electionAddr))
+ return;
+ }
+ if (self.getLastSeenQuorumVerifier()!=null && self.getLastSeenQuorumVerifier().getAllMembers().containsKey(sid)
+ && (!knownId || (self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr !=
+ self.getView().get(sid).electionAddr))) {
+ knownId = true;
+ if (connectOne(sid, self.getLastSeenQuorumVerifier().getAllMembers().get(sid).electionAddr))
+ return;
+ }
+ if (!knownId) {
LOG.warn("Invalid server id: " + sid);
return;
}
- try {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Opening channel to server " + sid);
- }
- Socket sock = new Socket();
- setSockOpts(sock);
- sock.connect(self.getView().get(sid).electionAddr, cnxTO);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connected to server " + sid);
- }
- initiateConnection(sock, sid);
- } catch (UnresolvedAddressException e) {
- // Sun doesn't include the address that causes this
- // exception to be thrown, also UAE cannot be wrapped cleanly
- // so we log the exception in order to capture this critical
- // detail.
- LOG.warn("Cannot open channel to " + sid
- + " at election address " + electionAddr, e);
- throw e;
- } catch (IOException e) {
- LOG.warn("Cannot open channel to " + sid
- + " at election address " + electionAddr,
- e);
- }
- } else {
- LOG.debug("There is a connection already for server " + sid);
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Mar 7 06:01:49 2013
@@ -23,6 +23,8 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
+import java.io.StringReader;
+import java.io.StringWriter;
import java.io.OutputStreamWriter;
import java.io.StringReader;
import java.io.StringWriter;
@@ -34,13 +36,20 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Properties;
+import java.util.Set;
+
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.common.AtomicFileOutputStream;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
+import org.apache.zookeeper.server.DataNode;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
@@ -85,7 +94,7 @@ public class QuorumPeer extends Thread i
QuorumBean jmxQuorumBean;
LocalPeerBean jmxLocalPeerBean;
LeaderElectionBean jmxLeaderElectionBean;
- QuorumCnxManager qcm;
+ QuorumCnxManager qcm = null;
/* ZKDatabase is a top level member of quorumpeer
* which will be used in all the zookeeperservers
@@ -291,7 +300,7 @@ public class QuorumPeer extends Thread i
}
/**
- * Sets the LearnerType both in the QuorumPeer and in the peerMap
+ * Sets the LearnerType
*/
public void setLearnerType(LearnerType p) {
learnerType = p;
@@ -332,7 +341,11 @@ public class QuorumPeer extends Thread i
* QuorumVerifier implementation; default (majority).
*/
- private QuorumVerifier quorumVerifier;
+ //last committed quorum verifier
+ public QuorumVerifier quorumVerifier;
+
+ //last proposed quorum verifier
+ public QuorumVerifier lastSeenQuorumVerifier = null;
/**
* My id
@@ -482,11 +495,21 @@ public class QuorumPeer extends Thread i
}
private ServerState state = ServerState.LOOKING;
+
+ private boolean reconfigFlag = false; // indicates that a reconfig just committed
public synchronized void setPeerState(ServerState newState){
state=newState;
}
-
+ public synchronized void reconfigFlagSet(){
+ reconfigFlag = true;
+ }
+ public synchronized void reconfigFlagClear(){
+ reconfigFlag = false;
+ }
+ public synchronized boolean isReconfigStateChange(){
+ return reconfigFlag;
+ }
public synchronized ServerState getPeerState(){
return state;
}
@@ -574,6 +597,9 @@ public class QuorumPeer extends Thread i
@Override
public synchronized void start() {
+ if (!getView().containsKey(myid)) {
+ throw new RuntimeException("My id " + myid + " not in the peer list");
+ }
loadDataBase();
cnxnFactory.start();
startLeaderElection();
@@ -630,22 +656,19 @@ public class QuorumPeer extends Thread i
responder.interrupt();
}
synchronized public void startLeaderElection() {
- try {
- currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
- } catch(IOException e) {
- RuntimeException re = new RuntimeException(e.getMessage());
- re.setStackTrace(e.getStackTrace());
- throw re;
- }
- for (QuorumServer p : getView().values()) {
- if (p.id == myid) {
- myQuorumAddr = p.addr;
- break;
- }
- }
- if (myQuorumAddr == null) {
- throw new RuntimeException("My id " + myid + " not in the peer list");
- }
+ try {
+ if (getPeerState() == ServerState.LOOKING) {
+ currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
+ }
+ } catch(IOException e) {
+ RuntimeException re = new RuntimeException(e.getMessage());
+ re.setStackTrace(e.getStackTrace());
+ throw re;
+ }
+
+ // if (!getView().containsKey(myid)) {
+ // throw new RuntimeException("My id " + myid + " not in the peer list");
+ //}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
@@ -795,6 +818,8 @@ public class QuorumPeer extends Thread i
return null;
}
+ boolean shuttingDownLE = false;
+
@Override
public void run() {
setName("QuorumPeer" + "[myid=" + getId() + "]" +
@@ -879,6 +904,11 @@ public class QuorumPeer extends Thread i
}
} else {
try {
+ reconfigFlagClear();
+ if (shuttingDownLE) {
+ shuttingDownLE = false;
+ startLeaderElection();
+ }
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
@@ -895,21 +925,21 @@ public class QuorumPeer extends Thread i
LOG.warn("Unexpected exception",e );
} finally {
observer.shutdown();
- setObserver(null);
- setPeerState(ServerState.LOOKING);
+ setObserver(null);
+ updateServerState();
}
break;
case FOLLOWING:
try {
- LOG.info("FOLLOWING");
+ LOG.info("FOLLOWING");
setFollower(makeFollower(logFactory));
follower.followLeader();
} catch (Exception e) {
- LOG.warn("Unexpected exception",e);
+ LOG.warn("Unexpected exception",e);
} finally {
- follower.shutdown();
- setFollower(null);
- setPeerState(ServerState.LOOKING);
+ follower.shutdown();
+ setFollower(null);
+ updateServerState();
}
break;
case LEADING:
@@ -925,10 +955,11 @@ public class QuorumPeer extends Thread i
leader.shutdown("Forcing shutdown");
setLeader(null);
}
- setPeerState(ServerState.LOOKING);
+ updateServerState();
}
break;
}
+ start_fle = System.currentTimeMillis();
}
} finally {
LOG.warn("QuorumPeer main thread exited");
@@ -942,6 +973,29 @@ public class QuorumPeer extends Thread i
}
}
+ private synchronized void updateServerState(){
+ if (!reconfigFlag) {
+ setPeerState(ServerState.LOOKING);
+ LOG.warn("PeerState set to LOOKING");
+ return;
+ }
+
+ if (getId() == getCurrentVote().getId()) {
+ setPeerState(ServerState.LEADING);
+ LOG.debug("PeerState set to LEADING");
+ } else if (getLearnerType() == LearnerType.PARTICIPANT) {
+ setPeerState(ServerState.FOLLOWING);
+ LOG.debug("PeerState set to FOLLOWING");
+ } else if (getLearnerType() == LearnerType.OBSERVER) {
+ setPeerState(ServerState.OBSERVING);
+ LOG.debug("PeerState set to OBSERVER");
+ } else { // currently shouldn't happen since there are only 2 learner types
+ setPeerState(ServerState.LOOKING);
+ LOG.debug("Shouldn't be here");
+ }
+ reconfigFlag = false;
+ }
+
public void shutdown() {
running = false;
if (leader != null) {
@@ -979,32 +1033,24 @@ public class QuorumPeer extends Thread i
* PeerType=PARTICIPANT.
*/
public Map<Long,QuorumPeer.QuorumServer> getVotingView() {
- Map<Long,QuorumPeer.QuorumServer> ret =
- new HashMap<Long, QuorumPeer.QuorumServer>();
- Map<Long,QuorumPeer.QuorumServer> view = getView();
- for (QuorumServer server : view.values()) {
- if (server.type == LearnerType.PARTICIPANT) {
- ret.put(server.id, server);
- }
- }
- return ret;
+ return getQuorumVerifier().getVotingMembers();
}
/**
* Returns only observers, no followers.
*/
public Map<Long,QuorumPeer.QuorumServer> getObservingView() {
- Map<Long,QuorumPeer.QuorumServer> ret =
- new HashMap<Long, QuorumPeer.QuorumServer>();
- Map<Long,QuorumPeer.QuorumServer> view = getView();
- for (QuorumServer server : view.values()) {
- if (server.type == LearnerType.OBSERVER) {
- ret.put(server.id, server);
- }
- }
- return ret;
+ return getQuorumVerifier().getObservingMembers();
}
+ public synchronized Set<Long> getAllKnownServerIds(){
+ Set<Long> tmp = new HashSet<Long>(getQuorumVerifier().getAllMembers().keySet());
+ if (getLastSeenQuorumVerifier()!=null) {
+ tmp.addAll(getLastSeenQuorumVerifier().getAllMembers().keySet());
+ }
+ return tmp;
+ }
+
/**
* Check if a node is in the current view. With static membership, the
* result of this check will never change; only when dynamic membership
@@ -1052,13 +1098,6 @@ public class QuorumPeer extends Thread i
/**
- * get the id of this quorum peer.
- */
- public long getMyid() {
- return myid;
- }
-
- /**
* set the id of this quorum peer.
*/
public void setMyid(long myid) {
@@ -1138,13 +1177,13 @@ public class QuorumPeer extends Thread i
props.load(new StringReader(s));
QuorumPeerConfig config = new QuorumPeerConfig();
- config.parseDynamicConfig(props, electionType);
+ config.parseDynamicConfig(props, electionType, false);
return config.getQuorumVerifier();
}
/**
- * Return QuorumVerifier object
+ * Return QuorumVerifier object for the last committed configuration
*/
public synchronized QuorumVerifier getQuorumVerifier(){
@@ -1152,15 +1191,58 @@ public class QuorumPeer extends Thread i
}
+ public synchronized QuorumVerifier getLastSeenQuorumVerifier(){
+ return lastSeenQuorumVerifier;
+ }
+
+ public synchronized void connectNewPeers(){
+ if (qcm!=null && getQuorumVerifier()!=null && getLastSeenQuorumVerifier()!=null) {
+ Map<Long, QuorumServer> committedView = getQuorumVerifier().getAllMembers();
+ for (Entry<Long, QuorumServer> e: getLastSeenQuorumVerifier().getAllMembers().entrySet()){
+ if (!committedView.containsKey(e.getKey()))
+ qcm.connectOne(e.getKey(), e.getValue().electionAddr);
+ }
+ }
+ }
+
+ public synchronized void restartLeaderElection(QuorumVerifier qvOLD, QuorumVerifier qvNEW){
+ if (qvOLD == null || !qvOLD.equals(qvNEW)) {
+ LOG.warn("Restarting Leader Election");
+ getElectionAlg().shutdown();
+ startLeaderElection();
+ }
+ }
+
+ public synchronized void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
+ if (lastSeenQuorumVerifier!=null && lastSeenQuorumVerifier.getVersion() >= qv.getVersion()) {
+ LOG.warn("setLastSeenQuorumVerifier called with stale config " + qv.getVersion() +
+ ". Current version: " + quorumVerifier.getVersion());
+
+ }
+ lastSeenQuorumVerifier = qv;
+ connectNewPeers();
+ if (writeToDisk) {
+ try {
+ QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename + ".next", null, false, qv);
+ } catch(IOException e){
+ LOG.error("Error closing file: ", e.getMessage());
+ }
+ }
+
+ }
+
public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
- LOG.warn("setQuorumVerifier called with stale config " + qv.getVersion() +
+ // this is normal. For example - server found out about new config through FastLeaderElection gossiping
+ // and then got the same config in UPTODATE message so its already known
+ LOG.debug(getId() + " setQuorumVerifier called with known or old config " + qv.getVersion() +
". Current version: " + quorumVerifier.getVersion());
return quorumVerifier;
}
QuorumVerifier prevQV = quorumVerifier;
- quorumVerifier = qv;
-
+ quorumVerifier = qv;
+ if (lastSeenQuorumVerifier == null || (qv.getVersion() > lastSeenQuorumVerifier.getVersion()))
+ lastSeenQuorumVerifier = qv;
if (writeToDisk) {
try {
QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename, configBackwardCompatibility, qv);
@@ -1173,14 +1255,17 @@ public class QuorumPeer extends Thread i
}
}
- QuorumServer qs = qv.getAllMembers().get(getId());
- if (qs!=null){
- setQuorumAddress(qs.addr);
- setElectionAddress(qs.electionAddr);
- setClientAddress(qs.clientAddr);
- }
- return prevQV;
- }
+ if (qv.getVersion() == lastSeenQuorumVerifier.getVersion()){
+ QuorumPeerConfig.deleteFile(dynamicConfigFilename + ".next");
+ }
+ QuorumServer qs = qv.getAllMembers().get(getId());
+ if (qs!=null){
+ setQuorumAddress(qs.addr);
+ setElectionAddress(qs.electionAddr);
+ setClientAddress(qs.clientAddr);
+ }
+ return prevQV;
+ }
/**
* Get an instance of LeaderElection
*/
@@ -1229,9 +1314,6 @@ public class QuorumPeer extends Thread i
return cnxnFactory.getLocalPort();
}
- public void setClientPortAddress(InetSocketAddress addr) {
- }
-
public void setTxnFactory(FileTxnSnapLog factory) {
this.logFactory = factory;
}
@@ -1247,7 +1329,11 @@ public class QuorumPeer extends Thread i
public void setZKDatabase(ZKDatabase database) {
this.zkDb = database;
}
-
+
+ public synchronized void initConfigInZKDatabase() {
+ if (zkDb != null) zkDb.initConfigInZKDatabase(getQuorumVerifier());
+ }
+
public void setRunning(boolean running) {
this.running = running;
}
@@ -1340,4 +1426,98 @@ public class QuorumPeer extends Thread i
acceptedEpoch = e;
writeLongToFile(ACCEPTED_EPOCH_FILENAME, e);
}
+
+ public boolean processReconfig(QuorumVerifier qv, Long suggestedLeaderId, Long zxid, boolean restartLE){
+ InetSocketAddress oldClientAddr = getClientAddress();
+
+ // update last committed quorum verifier, write the new config to disk
+ // and restart leader election if config changed
+ QuorumVerifier prevQV = setQuorumVerifier(qv, true);
+
+ // There is no log record for the initial config, thus after syncing
+ // with leader
+ // /zookeeper/config is empty! it is also possible that last committed
+ // config is propagated during leader election
+ // without the propagation the corresponding log records.
+ // so we should explicitly do this (this is not necessary when we're
+ // already a Follower/Observer, only
+ // for Learner):
+ initConfigInZKDatabase();
+
+ if (prevQV.getVersion() < qv.getVersion()) {
+ if (restartLE) restartLeaderElection(prevQV, qv);
+
+ QuorumServer myNewQS = qv.getAllMembers().get(getId());
+ if (myNewQS != null && myNewQS.clientAddr != null
+ && !myNewQS.clientAddr.equals(oldClientAddr)) {
+ cnxnFactory.reconfigure(myNewQS.clientAddr);
+ }
+
+ boolean roleChange = updateLearnerType(qv);
+ boolean leaderChange = false;
+ if (suggestedLeaderId != null) {
+ // zxid should be non-null too
+ leaderChange = updateVote(suggestedLeaderId, zxid);
+ } else {
+ long currentLeaderId = getCurrentVote().getId();
+ InetSocketAddress currentLeaderAddr = prevQV.getVotingMembers()
+ .get(currentLeaderId).addr;
+ leaderChange = (!qv.getVotingMembers().containsKey(
+ currentLeaderId))
+ || (!qv.getVotingMembers().get(currentLeaderId).addr
+ .equals(currentLeaderAddr));
+ // we don't have a designated leader - need to go into leader
+ // election
+ reconfigFlagClear();
+ }
+
+ if (roleChange || leaderChange) {
+ return true;
+ }
+ }
+ return false;
+
+ }
+
+ private boolean updateLearnerType(QuorumVerifier newQV) {
+ //check if I'm an observer in new config
+ if (newQV.getObservingMembers().containsKey(getId())) {
+ if (getLearnerType()!=LearnerType.OBSERVER){
+ setLearnerType(LearnerType.OBSERVER);
+ LOG.info("Becoming an observer");
+ reconfigFlagSet();
+ return true;
+ } else {
+ return false;
+ }
+ } else if (newQV.getVotingMembers().containsKey(getId())) {
+ if (getLearnerType()!=LearnerType.PARTICIPANT){
+ setLearnerType(LearnerType.PARTICIPANT);
+ LOG.info("Becoming a voting participant");
+ reconfigFlagSet();
+ return true;
+ } else {
+ return false;
+ }
+ }
+ // I'm not in the view
+ if (getLearnerType()!=LearnerType.PARTICIPANT){
+ setLearnerType(LearnerType.PARTICIPANT);
+ LOG.info("Becoming a non-voting participant");
+ reconfigFlagSet();
+ return true;
+ }
+ return false;
+ }
+
+ private boolean updateVote(long designatedLeader, long zxid){
+ Vote currentVote = getCurrentVote();
+ if (currentVote!=null && designatedLeader != currentVote.getId()) {
+ setCurrentVote(new Vote(designatedLeader, zxid));
+ reconfigFlagSet();
+ LOG.warn("Suggested leader: " + designatedLeader);
+ return true;
+ }
+ return false;
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Thu Mar 7 06:01:49 2013
@@ -70,7 +70,7 @@ public class QuorumPeerConfig {
protected long serverId;
- protected QuorumVerifier quorumVerifier = null;
+ protected QuorumVerifier quorumVerifier = null, lastSeenQuorumVerifier = null;
protected int snapRetainCount = 3;
protected int purgeInterval = 0;
@@ -121,7 +121,7 @@ public class QuorumPeerConfig {
if (dynamicConfigFileStr == null) {
configBackwardCompatibilityMode = true;
configFileStr = path;
- parseDynamicConfig(cfg, electionAlg);
+ parseDynamicConfig(cfg, electionAlg, true);
checkValidity();
}
@@ -140,14 +140,37 @@ public class QuorumPeerConfig {
} finally {
inConfig.close();
}
- parseDynamicConfig(dynamicCfg, electionAlg);
+ parseDynamicConfig(dynamicCfg, electionAlg, true);
checkValidity();
} catch (IOException e) {
throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
- }
+ }
+ File nextDynamicConfigFile = new File(dynamicConfigFileStr + ".next");
+ if (nextDynamicConfigFile.exists()) {
+ try {
+ Properties dynamicConfigNextCfg = new Properties();
+ FileInputStream inConfigNext = new FileInputStream(nextDynamicConfigFile);
+ try {
+ dynamicConfigNextCfg.load(inConfigNext);
+ } finally {
+ inConfigNext.close();
+ }
+ boolean isHierarchical = false;
+ for (Entry<Object, Object> entry : dynamicConfigNextCfg.entrySet()) {
+ String key = entry.getKey().toString().trim();
+ if (key.startsWith("group") || key.startsWith("weight")) {
+ isHierarchical = true;
+ break;
+ }
+ }
+ lastSeenQuorumVerifier = createQuorumVerifier(dynamicConfigNextCfg, isHierarchical);
+ } catch (IOException e) {
+ LOG.warn("NextQuorumVerifier is initiated to null");
+ }
+ }
}
}
@@ -258,7 +281,7 @@ public class QuorumPeerConfig {
boolean configBackwardCompatibilityMode, QuorumVerifier qv) throws IOException {
FileOutputStream outConfig = null;
try {
- byte b[] = qv.toByteArray();
+ byte b[] = qv.toString().getBytes();
if (configBackwardCompatibilityMode) {
dynamicConfigFilename = configFileStr + ".dynamic";
}
@@ -353,7 +376,7 @@ public class QuorumPeerConfig {
* @throws IOException
* @throws ConfigException
*/
- public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg)
+ public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg, boolean warnings)
throws IOException, ConfigException {
boolean isHierarchical = false;
for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
@@ -387,13 +410,15 @@ public class QuorumPeerConfig {
// b/w compatibility reasons we need to keep this here.
LOG.error("Invalid configuration, only one server specified (ignoring)");
//servers.clear();
- } else if (numParticipators > 1) {
- if (numParticipators == 2) {
- LOG.warn("No server failure will be tolerated. " +
- "You need at least 3 servers.");
- } else if (numParticipators % 2 == 0) {
- LOG.warn("Non-optimial configuration, consider an odd number of servers.");
- }
+ } else if (numParticipators > 1) {
+ if (warnings) {
+ if (numParticipators == 2) {
+ LOG.warn("No server failure will be tolerated. " +
+ "You need at least 3 servers.");
+ } else if (numParticipators % 2 == 0) {
+ LOG.warn("Non-optimial configuration, consider an odd number of servers.");
+ }
+ }
/*
* If using FLE, then every server requires a separate election
* port.
@@ -489,6 +514,10 @@ public class QuorumPeerConfig {
public QuorumVerifier getQuorumVerifier() {
return quorumVerifier;
}
+
+ public QuorumVerifier getLastSeenQuorumVerifier() {
+ return lastSeenQuorumVerifier;
+ }
public Map<Long,QuorumServer> getServers() {
// returns all configuration servers -- participants and observers
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Thu Mar 7 06:01:49 2013
@@ -151,6 +151,10 @@ public class QuorumPeerMain {
quorumPeer.setConfigBackwardCompatibility(config.getConfigBackwardCompatibility());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+ if (config.getLastSeenQuorumVerifier()!=null) {
+ quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
+ }
+ quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Thu Mar 7 06:01:49 2013
@@ -28,7 +28,7 @@ import org.apache.zookeeper.server.persi
* a quorum.
*/
public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
- protected final QuorumPeer self;
+ public final QuorumPeer self;
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
@@ -54,5 +54,7 @@ public abstract class QuorumZooKeeperSer
pwriter.println(self.getQuorumAddress().getPort());
pwriter.print("peerType=");
pwriter.println(self.getLearnerType().ordinal());
+ pwriter.println("membership: ");
+ pwriter.print(new String(self.getQuorumVerifier().toString().getBytes()));
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java Thu Mar 7 06:01:49 2013
@@ -80,6 +80,7 @@ public class ReadOnlyRequestProcessor ex
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
+ case OpCode.reconfig:
case OpCode.setACL:
case OpCode.multi:
case OpCode.check:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java Thu Mar 7 06:01:49 2013
@@ -241,7 +241,7 @@ public class QuorumHierarchical implemen
public Map<Long, QuorumServer> getAllMembers() {
return allMembers;
}
- public byte[] toByteArray(){
+ public String toString(){
StringWriter sw = new StringWriter();
for (QuorumServer member: getAllMembers().values()){
@@ -286,7 +286,7 @@ public class QuorumHierarchical implemen
sw.append("version=" + Long.toHexString(version));
- return sw.toString().getBytes();
+ return sw.toString();
}
/**
@@ -371,4 +371,9 @@ public class QuorumHierarchical implemen
public long getVersion() {
return version;
}
+
+ public void setVersion(long ver) {
+ version = ver;
+ }
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java Thu Mar 7 06:01:49 2013
@@ -109,7 +109,7 @@ public class QuorumMaj implements Quorum
return (long) 1;
}
- public byte[] toByteArray() {
+ public String toString() {
StringBuilder sw = new StringBuilder();
for (QuorumServer member : getAllMembers().values()) {
@@ -123,8 +123,8 @@ public class QuorumMaj implements Quorum
String hexVersion = Long.toHexString(version);
sw.append("version=");
sw.append(hexVersion);
- return sw.toString().getBytes();
- }
+ return sw.toString();
+ }
/**
* Verifies if a set is a majority. Assumes that ackSet contains acks only
@@ -149,5 +149,8 @@ public class QuorumMaj implements Quorum
public long getVersion() {
return version;
}
-
+
+ public void setVersion(long ver) {
+ version = ver;
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java Thu Mar 7 06:01:49 2013
@@ -34,9 +34,10 @@ public interface QuorumVerifier {
long getWeight(long id);
boolean containsQuorum(Set<Long> set);
long getVersion();
+ void setVersion(long ver);
Map<Long, QuorumServer> getAllMembers();
Map<Long, QuorumServer> getVotingMembers();
Map<Long, QuorumServer> getObservingMembers();
boolean equals(Object o);
- byte[] toByteArray();
+ String toString();
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/ConfigUtils.java Thu Mar 7 06:01:49 2013
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.util;
+
+import java.io.IOException;
+import java.io.StringReader;
+import java.util.Properties;
+import java.util.Map.Entry;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+
+
+public class ConfigUtils {
+ static public String getClientConfigStr(String configData) {
+ Properties props = new Properties();
+ try {
+ props.load(new StringReader(configData));
+ } catch (IOException e) {
+ e.printStackTrace();
+ return "";
+ }
+ StringBuffer sb = new StringBuffer();
+ boolean first = true;
+ String version = "";
+ for (Entry<Object, Object> entry : props.entrySet()) {
+ String key = entry.getKey().toString().trim();
+ String value = entry.getValue().toString().trim();
+ if (key.equals("version")) version = value;
+ if (!key.startsWith("server.")) continue;
+ QuorumPeer.QuorumServer qs;
+ try {
+ qs = new QuorumPeer.QuorumServer(-1, value);
+ } catch (ConfigException e) {
+ e.printStackTrace();
+ continue;
+ }
+ if (!first) sb.append(",");
+ else first = false;
+ sb.append(qs.clientAddr.getHostName() + ":" + qs.clientAddr.getPort());
+ }
+ return version + " " + sb.toString();
+ }
+}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/util/SerializeUtils.java Thu Mar 7 06:01:49 2013
@@ -71,6 +71,7 @@ public class SerializeUtils {
case OpCode.delete:
txn = new DeleteTxn();
break;
+ case OpCode.reconfig:
case OpCode.setData:
txn = new SetDataTxn();
break;
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Thu Mar 7 06:01:49 2013
@@ -53,17 +53,40 @@ public class QuorumPeerTestBase extends
}
}
}
-
+
+ public static class MainThreadReconfigRecovery extends MainThread {
+ final File nextDynamicConfigFile;
+
+ public MainThreadReconfigRecovery(int myid, int clientPort,
+ String currentQuorumCfgSection, String nextQuorumCfgSection)
+ throws IOException {
+ super(myid, clientPort, currentQuorumCfgSection);
+ nextDynamicConfigFile = new File(tmpDir, "zoo.dynamic.next");
+ FileWriter fwriter = new FileWriter(nextDynamicConfigFile);
+ fwriter.write(nextQuorumCfgSection + "\n");
+ fwriter.flush();
+ fwriter.close();
+ }
+ }
+
public static class MainThread implements Runnable {
final File confFile;
final File dynamicConfigFile;
+ final File tmpDir;
+
volatile TestQPMain main;
public MainThread(int myid, int clientPort, String quorumCfgSection)
throws IOException {
- File tmpDir = ClientBase.createTmpDir();
+ tmpDir = ClientBase.createTmpDir();
LOG.info("id = " + myid + " tmpDir = " + tmpDir + " clientPort = "
+ clientPort);
+
+ File dataDir = new File(tmpDir, "data");
+ if (!dataDir.mkdir()) {
+ throw new IOException("Unable to mkdir " + dataDir);
+ }
+
confFile = new File(tmpDir, "zoo.cfg");
dynamicConfigFile = new File(tmpDir, "zoo.dynamic");
@@ -72,11 +95,6 @@ public class QuorumPeerTestBase extends
fwriter.write("initLimit=10\n");
fwriter.write("syncLimit=5\n");
- File dataDir = new File(tmpDir, "data");
- if (!dataDir.mkdir()) {
- throw new IOException("Unable to mkdir " + dataDir);
- }
-
// Convert windows path to UNIX to avoid problems with "\"
String dir = dataDir.toString();
String dynamicConfigFilename = dynamicConfigFile.toString();
@@ -106,7 +124,7 @@ public class QuorumPeerTestBase extends
fwriter.flush();
fwriter.close();
}
-
+
Thread currentThread;
synchronized public void start() {
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/ReconfigRecoveryTest.java Thu Mar 7 06:01:49 2013
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.util.ArrayList;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.ReconfigTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ReconfigRecoveryTest extends QuorumPeerTestBase {
+ /**
+ * Reconfiguration recovery - test that a reconfiguration is completed
+ * if leader has .next file during startup and new config is not running yet
+ */
+ @Test
+ public void testNextConfigCompletion() throws Exception {
+ ClientBase.setupTestEnv();
+
+ // 2 servers in current config, 3 in next config
+ final int SERVER_COUNT = 3;
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+ ArrayList<String> allServers = new ArrayList<String>();
+
+ String currentQuorumCfgSection = null, nextQuorumCfgSection;
+
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
+ ":participant;localhost:" + clientPorts[i];
+ allServers.add(server);
+ sb.append(server +"\n");
+ if (i == 1) currentQuorumCfgSection = sb.toString();
+ }
+ sb.append("version=1\n"); // version of current config is 0
+ nextQuorumCfgSection = sb.toString();
+
+ // Both servers 0 and 1 will have the .next config file, which means
+ // for them that a reconfiguration was in progress when they failed
+ // and the leader will complete it
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+ for (int i = 0; i < SERVER_COUNT - 1; i++) {
+ mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ Assert.assertTrue("waiting for server 0 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[0],
+ CONNECTION_TIMEOUT));
+ Assert.assertTrue("waiting for server 1 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[1],
+ CONNECTION_TIMEOUT));
+
+ int leader = mt[0].main.quorumPeer.leader == null ? 1: 0;
+
+ // the new server's config is going to include itself and the current leader
+ sb = new StringBuilder();
+ sb.append(allServers.get(leader) + "\n");
+ sb.append(allServers.get(2) + "\n");
+
+ // suppose that this new server never heard about the reconfig proposal
+ String newServerInitialConfig = sb.toString();
+ mt[2] = new MainThread(2, clientPorts[2], newServerInitialConfig);
+ mt[2].start();
+ zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+
+ Assert.assertTrue("waiting for server 2 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+ CONNECTION_TIMEOUT));
+
+ ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+ ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+ ReconfigTest.testServerHasConfig(zk[2], allServers, null);
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+ ReconfigTest.testNormalOperation(zk[2], zk[1]);
+
+ zk[2].close();
+ mt[2].shutdown();
+
+ //now suppose that the new server heard the reconfig request
+ mt[2] = new MainThreadReconfigRecovery(2, clientPorts[2], newServerInitialConfig, nextQuorumCfgSection);
+ mt[2].start();
+ zk[2] = new ZooKeeper("127.0.0.1:" + clientPorts[2], ClientBase.CONNECTION_TIMEOUT, this);
+
+ Assert.assertTrue("waiting for server 2 being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[2],
+ CONNECTION_TIMEOUT));
+
+ ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+ ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+ ReconfigTest.testServerHasConfig(zk[2], allServers, null);
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+ ReconfigTest.testNormalOperation(zk[2], zk[1]);
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ zk[i].close();
+ }
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+ /**
+ * Reconfiguration recovery - current config servers discover .next file,
+ * but they're both observers and their ports change in next config. Suppose that next config wasn't activated yet.
+ * Should complete reconfiguration.
+ */
+ @Test
+ public void testCurrentServersAreObserversInNextConfig() throws Exception {
+ ClientBase.setupTestEnv();
+
+ // 2 servers in current config, 5 in next config
+ final int SERVER_COUNT = 5;
+ final int clientPorts[] = new int[SERVER_COUNT];
+ final int oldClientPorts[] = new int[2];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ String currentQuorumCfgSection = null, nextQuorumCfgSection;
+
+ ArrayList<String> allServersCurrent = new ArrayList<String>();
+ ArrayList<String> allServersNext = new ArrayList<String>();
+
+
+ for(int i = 0; i < 2; i++) {
+ oldClientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
+ ":participant;localhost:" + oldClientPorts[i];
+ allServersCurrent.add(server);
+ sb.append(server +"\n");
+ }
+
+ currentQuorumCfgSection = sb.toString();
+ sb = new StringBuilder();
+ String role;
+ for (int i=0; i<SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ if (i < 2) role = "observer";
+ else role = "participant";
+ server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
+ ":" + role + ";localhost:" + clientPorts[i];
+ allServersNext.add(server);
+ sb.append(server +"\n");
+
+ }
+ sb.append("version=1\n"); // version of current config is 0
+ nextQuorumCfgSection = sb.toString();
+
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+ for (int i = 0; i < 2; i++) {
+ mt[i] = new MainThreadReconfigRecovery(i, oldClientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ // new members are initialized with current config + the new server
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection + allServersNext.get(i));
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ for (int i=0; i<SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server "+ i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT * 2));
+ ReconfigTest.testServerHasConfig(zk[i], allServersNext, null);
+ }
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+ ReconfigTest.testNormalOperation(zk[4], zk[1]);
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ zk[i].close();
+ }
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+
+ }
+
+
+
+ /**
+ * Reconfiguration recovery - test that if servers in old config have a .next file
+ * but no quorum of new config is up then no progress should be possible (no progress will happen
+ * to ensure safety as the new config might be actually up but partitioned from old config)
+ */
+ @Test
+ public void testNextConfigUnreachable() throws Exception {
+ ClientBase.setupTestEnv();
+
+ // 2 servers in current config, 5 in next config
+ final int SERVER_COUNT = 5;
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ String currentQuorumCfgSection = null, nextQuorumCfgSection;
+
+ ArrayList<String> allServers = new ArrayList<String>();
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
+ ":participant;localhost:" + clientPorts[i];
+ allServers.add(server);
+ sb.append(server +"\n");
+ if (i == 1) currentQuorumCfgSection = sb.toString();
+ }
+ sb.append("version=1\n"); // version of current config is 0
+ nextQuorumCfgSection = sb.toString();
+
+ // lets start servers 2, 3, 4 with the new config
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+
+ // Both servers 0 and 1 will have the .next config file, which means
+ // for them that a reconfiguration was in progress when they failed
+ // and the leader will complete it.
+ for (int i = 0; i < 2; i++) {
+ mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ Thread.sleep(CONNECTION_TIMEOUT*2);
+
+ // make sure servers 0, 1 don't come online
+ for (int i = 0; i < 2; i++) {
+ Assert.assertFalse("server " + i + " is up but shouldn't be",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT / 10));
+ }
+
+ for (int i = 0; i < 2; i++) {
+ zk[i].close();
+ }
+ for (int i = 0; i < 2; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+ /**
+ * Reconfiguration recovery - test that old config members will join the new config
+ * if its already active, and not try to complete the reconfiguration
+ */
+ @Test
+ public void testNextConfigAlreadyActive() throws Exception {
+ ClientBase.setupTestEnv();
+
+ // 2 servers in current config, 5 in next config
+ final int SERVER_COUNT = 5;
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ String currentQuorumCfgSection = null, nextQuorumCfgSection;
+
+ ArrayList<String> allServers = new ArrayList<String>();
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=localhost:"+PortAssignment.unique()+":"+PortAssignment.unique() +
+ ":participant;localhost:" + clientPorts[i];
+ allServers.add(server);
+ sb.append(server +"\n");
+ if (i == 1) currentQuorumCfgSection = sb.toString();
+ }
+ sb.append("version=1\n"); // version of current config is 0
+ nextQuorumCfgSection = sb.toString();
+
+ // lets start servers 2, 3, 4 with the new config
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+ ZooKeeper zk[] = new ZooKeeper[SERVER_COUNT];
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+ for (int i = 2; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT));
+ }
+
+ ReconfigTest.testNormalOperation(zk[2], zk[3]);
+
+ long epoch = mt[2].main.quorumPeer.getAcceptedEpoch();
+
+ // Both servers 0 and 1 will have the .next config file, which means
+ // for them that a reconfiguration was in progress when they failed
+ // and the leader will complete it.
+ for (int i = 0; i < 2; i++) {
+ mt[i] = new MainThreadReconfigRecovery(i, clientPorts[i], currentQuorumCfgSection, nextQuorumCfgSection);
+ mt[i].start();
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+
+ // servers 0 and 1 should connect to all servers, including the one in their
+ // .next file during startup, and will find the next config and join it
+ for (int i = 0; i < 2; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i],
+ CONNECTION_TIMEOUT*2));
+ }
+
+ // make sure they joined the new config without any change to it
+ Assert.assertEquals(epoch, mt[0].main.quorumPeer.getAcceptedEpoch());
+ Assert.assertEquals(epoch, mt[1].main.quorumPeer.getAcceptedEpoch());
+ Assert.assertEquals(epoch, mt[2].main.quorumPeer.getAcceptedEpoch());
+
+ ReconfigTest.testServerHasConfig(zk[0], allServers, null);
+ ReconfigTest.testServerHasConfig(zk[1], allServers, null);
+
+ ReconfigTest.testNormalOperation(zk[0], zk[2]);
+ ReconfigTest.testNormalOperation(zk[4], zk[1]);
+
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ zk[i].close();
+ }
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+
+
+}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Thu Mar 7 06:01:49 2013
@@ -62,6 +62,7 @@ import org.apache.zookeeper.txn.SetDataT
import org.apache.zookeeper.txn.TxnHeader;
import org.junit.Assert;
import org.junit.Test;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -300,6 +301,9 @@ public class Zab1_0Test {
public int getNumAliveConnections() {
return 0;
}
+ @Override
+ public void reconfigure(InetSocketAddress addr) {
+ }
}
static Socket[] getSocketPair() throws IOException {
ServerSocket ss = new ServerSocket();
@@ -496,41 +500,41 @@ public class Zab1_0Test {
@Test
public void testUnnecessarySnap() throws Exception {
testPopulatedLeaderConversation(new PopulatedLeaderConversation() {
- @Override
- public void converseWithLeader(InputArchive ia, OutputArchive oa,
+ @Override
+ public void converseWithLeader(InputArchive ia, OutputArchive oa,
Leader l, long zxid) throws Exception {
-
- Assert.assertEquals(1, l.self.getAcceptedEpoch());
- Assert.assertEquals(1, l.self.getCurrentEpoch());
-
- /* we test a normal run. everything should work out well. */
- LearnerInfo li = new LearnerInfo(1, 0x10000);
- byte liBytes[] = new byte[12];
- ByteBufferOutputStream.record2ByteBuffer(li,
- ByteBuffer.wrap(liBytes));
- QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
- liBytes, null);
- oa.writeRecord(qp, null);
-
- readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.LEADERINFO, qp.getType());
- Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
- Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
- 0x10000);
- Assert.assertEquals(2, l.self.getAcceptedEpoch());
- Assert.assertEquals(1, l.self.getCurrentEpoch());
-
- byte epochBytes[] = new byte[4];
- final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
- wrappedEpochBytes.putInt(1);
- qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
- oa.writeRecord(qp, null);
-
- readPacketSkippingPing(ia, qp);
- Assert.assertEquals(Leader.DIFF, qp.getType());
-
- }
- }, 2);
+
+ Assert.assertEquals(1, l.self.getAcceptedEpoch());
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+ /* we test a normal run. everything should work out well. */
+ LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+ byte liBytes[] = new byte[20];
+ ByteBufferOutputStream.record2ByteBuffer(li,
+ ByteBuffer.wrap(liBytes));
+ QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 1,
+ liBytes, null);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+ Assert.assertEquals(ZxidUtils.makeZxid(2, 0), qp.getZxid());
+ Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+ 0x10000);
+ Assert.assertEquals(2, l.self.getAcceptedEpoch());
+ Assert.assertEquals(1, l.self.getCurrentEpoch());
+
+ byte epochBytes[] = new byte[4];
+ final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);
+ wrappedEpochBytes.putInt(1);
+ qp = new QuorumPacket(Leader.ACKEPOCH, zxid, epochBytes, null);
+ oa.writeRecord(qp, null);
+
+ readPacketSkippingPing(ia, qp);
+ Assert.assertEquals(Leader.DIFF, qp.getType());
+
+ }
+ }, 2);
}
@Test
@@ -796,8 +800,8 @@ public class Zab1_0Test {
Assert.assertEquals(0, l.self.getCurrentEpoch());
/* we test a normal run. everything should work out well. */
- LearnerInfo li = new LearnerInfo(1, 0x10000);
- byte liBytes[] = new byte[12];
+ LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+ byte liBytes[] = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
@@ -856,8 +860,8 @@ public class Zab1_0Test {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
throws IOException {
/* we test a normal run. everything should work out well. */
- LearnerInfo li = new LearnerInfo(1, 0x10000);
- byte liBytes[] = new byte[12];
+ LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+ byte liBytes[] = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
/* we are going to say we last acked epoch 20 */
@@ -905,8 +909,8 @@ public class Zab1_0Test {
public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
throws IOException, InterruptedException {
/* we test a normal run. everything should work out well. */
- LearnerInfo li = new LearnerInfo(1, 0x10000);
- byte liBytes[] = new byte[12];
+ LearnerInfo li = new LearnerInfo(1, 0x10000, 0);
+ byte liBytes[] = new byte[20];
ByteBufferOutputStream.record2ByteBuffer(li,
ByteBuffer.wrap(liBytes));
QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java Thu Mar 7 06:01:49 2013
@@ -18,6 +18,7 @@
package org.apache.zookeeper.test;
+import java.io.DataOutputStream;
import java.io.File;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@@ -232,16 +233,18 @@ public class CnxManagerTest extends ZKTe
SocketChannel sc = SocketChannel.open();
sc.socket().connect(peers.get(1L).electionAddr, 5000);
- /*
- * Write id first then negative length.
- */
- byte[] msgBytes = new byte[8];
- ByteBuffer msgBuffer = ByteBuffer.wrap(msgBytes);
- msgBuffer.putLong(2L);
- msgBuffer.position(0);
- sc.write(msgBuffer);
+ InetSocketAddress otherAddr = peers.get(new Long(2)).electionAddr;
+ DataOutputStream dout = new DataOutputStream(sc.socket().getOutputStream());
+ dout.writeLong(0xffff0000);
+ dout.writeLong(new Long(2));
+ String addr = otherAddr.getHostName()+ ":" + otherAddr.getPort();
+ byte[] addr_bytes = addr.getBytes();
+ dout.writeInt(addr_bytes.length);
+ dout.write(addr_bytes);
+ dout.flush();
+
- msgBuffer = ByteBuffer.wrap(new byte[4]);
+ ByteBuffer msgBuffer = ByteBuffer.wrap(new byte[4]);
msgBuffer.putInt(-20);
msgBuffer.position(0);
sc.write(msgBuffer);
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java?rev=1453693&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumMajorityTest.java Thu Mar 7 06:01:49 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+import java.util.HashSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QuorumMajorityTest extends QuorumBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(QuorumMajorityTest.class);
+ public static final long CONNECTION_TIMEOUT = ClientTest.CONNECTION_TIMEOUT;
+
+ /***************************************************************/
+ /* Test that the majority quorum verifier only counts votes from */
+ /* followers in its view */
+ /***************************************************************/
+ @Test
+ public void testMajQuorums() throws Throwable {
+
+ //setup servers 1-5 to be followers
+ setUp(false);
+
+ Proposal p = new Proposal();
+
+ p.addQuorumVerifier(s1.getQuorumVerifier());
+
+ // 2 followers out of 5 is not a majority
+ p.addAck(Long.valueOf(1));
+ p.addAck(Long.valueOf(2));
+ Assert.assertEquals(false, p.hasAllQuorums());
+
+ // 6 is not in the view - its vote shouldn't count
+ p.addAck(Long.valueOf(6));
+ Assert.assertEquals(false, p.hasAllQuorums());
+
+ // 3 followers out of 5 are a majority of the voting view
+ p.addAck(Long.valueOf(3));
+ Assert.assertEquals(true, p.hasAllQuorums());
+
+ //setup servers 1-3 to be followers and 4 and 5 to be observers
+ setUp(true);
+
+ p = new Proposal();
+ p.addQuorumVerifier(s1.getQuorumVerifier());
+
+ // 1 follower out of 3 is not a majority
+ p.addAck(Long.valueOf(1));
+ Assert.assertEquals(false, p.hasAllQuorums());
+
+ // 4 and 5 are observers, their vote shouldn't count
+ p.addAck(Long.valueOf(4));
+ p.addAck(Long.valueOf(5));
+ Assert.assertEquals(false, p.hasAllQuorums());
+
+ // 6 is not in the view - its vote shouldn't count
+ p.addAck(Long.valueOf(6));
+ Assert.assertEquals(false, p.hasAllQuorums());
+
+ // 2 followers out of 3 are a majority of the voting view
+ p.addAck(Long.valueOf(2));
+ Assert.assertEquals(true, p.hasAllQuorums());
+ }
+}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Thu Mar 7 06:01:49 2013
@@ -122,6 +122,9 @@ public class QuorumUtil {
return peers.get(id);
}
+ // This was added to avoid running into the problem of ZOOKEEPER-1539
+ public boolean disableJMXTest = false;
+
public void startAll() throws IOException {
shutdownAll();
for (int i = 1; i <= ALL; ++i) {
@@ -136,6 +139,9 @@ public class QuorumUtil {
LOG.info(hp + " is accepting client connections");
}
+ // This was added to avoid running into the problem of ZOOKEEPER-1539
+ if (disableJMXTest) return;
+
// interesting to see what's there...
try {
JMXEnv.dump();