You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2012/04/23 01:20:15 UTC
svn commit: r1328991 [1/2] - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/main/org/apache/zookeeper/server/quorum/flexible/
src/java/test/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/serve...
Author: breed
Date: Sun Apr 22 23:20:13 2012
New Revision: 1328991
URL: http://svn.apache.org/viewvc?rev=1328991&view=rev
Log:
ZOOKEEPER-1411. Consolidate membership management, distinguish between static and dynamic configuration parameters
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/util/DynamicConfigBCTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/CnxManagerTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLETest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ObserverTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/StandaloneTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Sun Apr 22 23:20:13 2012
@@ -276,6 +276,8 @@ IMPROVEMENTS:
ZOOKEEPER-1432. Add javadoc and debug logging for checkACL() method in
PrepRequestProcessor (Eugene Koontz via michim)
+ ZOOKEEPER-1411. Consolidate membership management, distinguish between static and dynamic configuration parameters (Alex Shraer via breed)
+
Release 3.4.0 -
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Sun Apr 22 23:20:13 2012
@@ -585,14 +585,14 @@ public class FastLeaderElection implemen
* different zxids for a server depending on timing.
*/
for (Map.Entry<Long,Vote> entry : votes.entrySet()) {
- if (vote.equals(entry.getValue())){
+ if (self.getQuorumVerifier().getVotingMembers().containsKey(entry.getKey())
+ && vote.equals(entry.getValue())){
set.add(entry.getKey());
}
}
return self.getQuorumVerifier().containsQuorum(set);
}
-
/**
* In the case there is a leader elected, and a quorum supporting
* this leader, we have to check if the leader has voted and acked
@@ -665,7 +665,7 @@ public class FastLeaderElection implemen
* @return long
*/
private long getInitId(){
- if(self.getLearnerType() == LearnerType.PARTICIPANT)
+ if(self.getQuorumVerifier().getVotingMembers().containsKey(self.getId()))
return self.getId();
else return Long.MIN_VALUE;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Sun Apr 22 23:20:13 2012
@@ -396,7 +396,9 @@ public class Leader {
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : getLearners()) {
- followerSet.add(f.getSid());
+ if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
+ followerSet.add(f.getSid());
+ }
}
if (self.getQuorumVerifier().containsQuorum(followerSet)) {
@@ -455,7 +457,7 @@ public class Leader {
for (LearnerHandler f : getLearners()) {
// Synced set is used to check we have a supporting quorum, so only
// PARTICIPANT, not OBSERVER, learners should be used
- if (f.synced() && f.getLearnerType() == LearnerType.PARTICIPANT) {
+ if (f.synced() && self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())) {
syncedSet.add(f.getSid());
}
f.ping();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Sun Apr 22 23:20:13 2012
@@ -261,8 +261,12 @@ public class LearnerHandler extends Thre
this.sid = leader.followerCounter.getAndDecrement();
}
- LOG.info("Follower sid: " + sid + " : info : "
- + leader.self.quorumPeers.get(sid));
+ if (leader.self.getView().containsKey(this.sid)) {
+ LOG.info("Follower sid: " + this.sid + " : info : "
+ + leader.self.getView().get(this.sid).toString());
+ } else {
+ LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
+ }
if (qp.getType() == Leader.OBSERVERINFO) {
learnerType = LearnerType.OBSERVER;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java Sun Apr 22 23:20:13 2012
@@ -337,8 +337,8 @@ public class QuorumCnxManager {
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) == null){
InetSocketAddress electionAddr;
- if (self.quorumPeers.containsKey(sid)) {
- electionAddr = self.quorumPeers.get(sid).electionAddr;
+ if (self.getView().containsKey(sid)) {
+ electionAddr = self.getView().get(sid).electionAddr;
} else {
LOG.warn("Invalid server id: " + sid);
return;
@@ -479,12 +479,10 @@ public class QuorumCnxManager {
try {
ss = new ServerSocket();
ss.setReuseAddress(true);
- int port = self.quorumPeers.get(self.getId()).electionAddr
- .getPort();
- InetSocketAddress addr = new InetSocketAddress(port);
+ InetSocketAddress addr = self.getElectionAddress();
LOG.info("My election bind port: " + addr.toString());
- setName(self.quorumPeers.get(self.getId()).electionAddr
- .toString());
+ setName(addr.toString());
+ // we used to bind to the * address but this seems more correct
ss.bind(addr);
while (!shutdown) {
Socket client = ss.accept();
@@ -513,7 +511,7 @@ public class QuorumCnxManager {
LOG.error("As I'm leaving the listener thread, "
+ "I won't be able to participate in leader "
+ "election any longer: "
- + self.quorumPeers.get(self.getId()).electionAddr);
+ + self.getElectionAddress());
}
}
@@ -684,7 +682,7 @@ public class QuorumCnxManager {
self.getId() + " error = " + e);
}
this.finish();
- LOG.warn("Send worker leaving thread");
+ LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Sun Apr 22 23:20:13 2012
@@ -25,6 +25,8 @@ import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.io.OutputStreamWriter;
+import java.io.StringReader;
+import java.io.StringWriter;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
@@ -35,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Properties;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.jmx.ZKMBeanInfo;
@@ -42,6 +45,7 @@ import org.apache.zookeeper.server.Serve
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
@@ -92,17 +96,97 @@ public class QuorumPeer extends Thread i
private ZKDatabase zkDb;
public static class QuorumServer {
+ public InetSocketAddress addr = null;
+
+ public InetSocketAddress electionAddr = null;
+
+ public InetSocketAddress clientAddr = null;
+
+ public long id;
+
+ public LearnerType type = LearnerType.PARTICIPANT;
+
+
+ public QuorumServer(long id, InetSocketAddress addr,
+ InetSocketAddress electionAddr, InetSocketAddress clientAddr) {
+ this.id = id;
+ this.addr = addr;
+ this.electionAddr = electionAddr;
+ this.clientAddr = clientAddr;
+ }
+
+
public QuorumServer(long id, InetSocketAddress addr,
InetSocketAddress electionAddr) {
this.id = id;
this.addr = addr;
this.electionAddr = electionAddr;
+ this.clientAddr = null;
}
public QuorumServer(long id, InetSocketAddress addr) {
this.id = id;
this.addr = addr;
this.electionAddr = null;
+ this.clientAddr = null;
+ }
+
+
+ private void setType(String s) throws ConfigException {
+ if (s.toLowerCase().equals("observer")) {
+ type = LearnerType.OBSERVER;
+ } else if (s.toLowerCase().equals("participant")) {
+ type = LearnerType.PARTICIPANT;
+ } else {
+ throw new ConfigException("Unrecognised peertype: " + s);
+ }
+ }
+
+ private static final String wrongFormat = " does not have the form server_cofig or server_config;client_config"+
+ " where server_config is host:port:port or host:port:port:type and client_config is port or host:port";
+
+ public QuorumServer(long sid, String addressStr) throws ConfigException {
+ // LOG.warn("sid = " + sid + " addressStr = " + addressStr);
+ this.id = sid;
+ String serverClientParts[] = addressStr.split(";");
+ String serverParts[] = serverClientParts[0].split(":");
+ if ((serverClientParts.length > 2) || (serverParts.length < 3)
+ || (serverParts.length > 4)) {
+ throw new ConfigException(addressStr + wrongFormat);
+ }
+
+ if (serverClientParts.length == 2) {
+ //LOG.warn("ClientParts: " + serverClientParts[1]);
+ String clientParts[] = serverClientParts[1].split(":");
+ if (clientParts.length > 2) {
+ throw new ConfigException(addressStr + wrongFormat);
+ }
+
+ // is client_config a host:port or just a port
+ String hostname = (clientParts.length == 2) ? clientParts[0] : serverParts[0];
+ try {
+ clientAddr = new InetSocketAddress(hostname,
+ Integer.parseInt(clientParts[clientParts.length - 1]));
+ //LOG.warn("Set clientAddr to " + clientAddr);
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + hostname + ":" + clientParts[clientParts.length - 1]);
+ }
+ }
+
+ // server_config should be either host:port:port or host:port:port:type
+ try {
+ addr = new InetSocketAddress(serverParts[0],
+ Integer.parseInt(serverParts[1]));
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[1]);
+ }
+ try {
+ electionAddr = new InetSocketAddress(serverParts[0],
+ Integer.parseInt(serverParts[2]));
+ } catch (NumberFormatException e) {
+ throw new ConfigException("Address unresolved: " + serverParts[0] + ":" + serverParts[2]);
+ }
+ if (serverParts.length == 4) setType(serverParts[3]);
}
public QuorumServer(long id, InetSocketAddress addr,
@@ -111,17 +195,63 @@ public class QuorumPeer extends Thread i
this.addr = addr;
this.electionAddr = electionAddr;
this.type = type;
+ this.clientAddr = null;
}
- public InetSocketAddress addr;
-
- public InetSocketAddress electionAddr;
-
- public long id;
+ public QuorumServer(long id, InetSocketAddress addr,
+ InetSocketAddress electionAddr, InetSocketAddress clientAddr, LearnerType type) {
+ this.id = id;
+ this.addr = addr;
+ this.electionAddr = electionAddr;
+ this.type = type;
+ this.clientAddr = clientAddr;
+ }
+
+ public String toString(){
+ StringWriter sw = new StringWriter();
+ //addr should never be null, but just to make sure
+ if (addr !=null) {
+ sw.append(addr.getHostName());
+ sw.append(":");
+ sw.append(String.valueOf(addr.getPort()));
+ }
+ if (electionAddr!=null){
+ sw.append(":");
+ sw.append(String.valueOf(electionAddr.getPort()));
+ }
+ if (type == LearnerType.OBSERVER) sw.append(":observer");
+ else if (type == LearnerType.PARTICIPANT) sw.append(":participant");
+ if (clientAddr!=null){
+ sw.append(";");
+ sw.append(clientAddr.getHostName() + ":" + String.valueOf(clientAddr.getPort()));
+ }
+ return sw.toString();
+ }
- public LearnerType type = LearnerType.PARTICIPANT;
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+
+ private boolean checkAddressesEqual(InetSocketAddress addr1, InetSocketAddress addr2){
+ if ((addr1 == null && addr2!=null) ||
+ (addr1!=null && addr2==null) ||
+ (addr1!=null && addr2!=null && !addr1.equals(addr2))) return false;
+ return true;
+ }
+
+ public boolean equals(Object o){
+ if (!(o instanceof QuorumServer)) return false;
+ QuorumServer qs = (QuorumServer)o;
+ if ((qs.id != id) || (qs.type != type)) return false;
+ if (!checkAddressesEqual(addr, qs.addr)) return false;
+ if (!checkAddressesEqual(electionAddr, qs.electionAddr)) return false;
+ if (!checkAddressesEqual(clientAddr, qs.clientAddr)) return false;
+ return true;
+ }
}
+
public enum ServerState {
LOOKING, FOLLOWING, LEADING, OBSERVING;
}
@@ -165,18 +295,35 @@ public class QuorumPeer extends Thread i
*/
public void setLearnerType(LearnerType p) {
learnerType = p;
- if (quorumPeers.containsKey(this.myid)) {
- this.quorumPeers.get(myid).type = p;
- } else {
- LOG.error("Setting LearnerType to " + p + " but " + myid
- + " not in QuorumPeers. ");
- }
+ }
+
+ protected synchronized void setDynamicConfigFilename(String s) {
+ dynamicConfigFilename = s;
}
- /**
- * The servers that make up the cluster
- */
- protected Map<Long, QuorumServer> quorumPeers;
+
+ protected synchronized String getDynamicConfigFilename() {
+ return dynamicConfigFilename;
+ }
+
+ protected synchronized void setConfigFileName(String s) {
+ configFilename = s;
+ }
+
+ protected synchronized void setConfigBackwardCompatibility(boolean bc) {
+ configBackwardCompatibility = bc;
+ }
+
+ protected synchronized boolean getConfigBackwardCompatibility() {
+ return configBackwardCompatibility;
+ }
+
+ private String dynamicConfigFilename = null;
+
+ private String configFilename = null;
+
+ private boolean configBackwardCompatibility = false;
+
public int getQuorumSize(){
return getVotingView().size();
}
@@ -185,7 +332,7 @@ public class QuorumPeer extends Thread i
* QuorumVerifier implementation; default (majority).
*/
- private QuorumVerifier quorumConfig;
+ private QuorumVerifier quorumVerifier;
/**
* My id
@@ -347,11 +494,33 @@ public class QuorumPeer extends Thread i
DatagramSocket udpSocket;
private InetSocketAddress myQuorumAddr;
+ private InetSocketAddress myElectionAddr = null;
+ private InetSocketAddress myClientAddr = null;
- public InetSocketAddress getQuorumAddress(){
+ public synchronized InetSocketAddress getQuorumAddress(){
return myQuorumAddr;
}
+
+ public synchronized void setQuorumAddress(InetSocketAddress addr){
+ myQuorumAddr = addr;
+ }
+
+ public InetSocketAddress getElectionAddress(){
+ return myElectionAddr;
+ }
+ public void setElectionAddress(InetSocketAddress addr){
+ myElectionAddr = addr;
+ }
+
+ public InetSocketAddress getClientAddress(){
+ return myClientAddr;
+ }
+
+ public void setClientAddress(InetSocketAddress addr){
+ myClientAddr = addr;
+ }
+
private int electionType;
Election electionAlg;
@@ -375,19 +544,18 @@ public class QuorumPeer extends Thread i
File dataLogDir, int electionType,
long myid, int tickTime, int initLimit, int syncLimit,
ServerCnxnFactory cnxnFactory) throws IOException {
- this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
- initLimit, syncLimit, cnxnFactory,
- new QuorumMaj(countParticipants(quorumPeers)));
+ this(quorumPeers, dataDir, dataLogDir, electionType, myid, tickTime,
+ initLimit, syncLimit, cnxnFactory,
+ new QuorumMaj(quorumPeers), null);
}
public QuorumPeer(Map<Long, QuorumServer> quorumPeers, File dataDir,
File dataLogDir, int electionType,
long myid, int tickTime, int initLimit, int syncLimit,
ServerCnxnFactory cnxnFactory,
- QuorumVerifier quorumConfig) throws IOException {
+ QuorumVerifier quorumConfig, String memFilename) throws IOException {
this();
this.cnxnFactory = cnxnFactory;
- this.quorumPeers = quorumPeers;
this.electionType = electionType;
this.myid = myid;
this.tickTime = tickTime;
@@ -395,9 +563,9 @@ public class QuorumPeer extends Thread i
this.syncLimit = syncLimit;
this.logFactory = new FileTxnSnapLog(dataLogDir, dataDir);
this.zkDb = new ZKDatabase(this.logFactory);
- if(quorumConfig == null)
- this.quorumConfig = new QuorumMaj(countParticipants(quorumPeers));
- else this.quorumConfig = quorumConfig;
+ this.dynamicConfigFilename = (memFilename != null) ? memFilename : "zoo_replicated" + myid + ".dynamic";
+ if(quorumConfig == null) quorumConfig = new QuorumMaj(quorumPeers);
+ setQuorumVerifier(quorumConfig, false);
}
QuorumStats quorumStats() {
@@ -506,6 +674,7 @@ public class QuorumPeer extends Thread i
}
/**
+
* This constructor is only used by the existing unit test code.
* It defaults to FileLogProvider persistence provider.
*/
@@ -517,7 +686,7 @@ public class QuorumPeer extends Thread i
this(quorumPeers, snapDir, logDir, electionAlg,
myid,tickTime, initLimit,syncLimit,
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
- new QuorumMaj(countParticipants(quorumPeers)));
+ new QuorumMaj(quorumPeers), null);
}
/**
@@ -533,7 +702,7 @@ public class QuorumPeer extends Thread i
this(quorumPeers, snapDir, logDir, electionAlg,
myid,tickTime, initLimit,syncLimit,
ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1),
- quorumConfig);
+ quorumConfig, null);
}
/**
@@ -802,7 +971,7 @@ public class QuorumPeer extends Thread i
* ensemble.
*/
public Map<Long,QuorumPeer.QuorumServer> getView() {
- return Collections.unmodifiableMap(this.quorumPeers);
+ return Collections.unmodifiableMap(getQuorumVerifier().getAllMembers());
}
/**
@@ -842,7 +1011,7 @@ public class QuorumPeer extends Thread i
* is introduced will this be more useful.
*/
public boolean viewContains(Long sid) {
- return this.quorumPeers.containsKey(sid);
+ return this.getView().containsKey(sid);
}
/**
@@ -964,19 +1133,54 @@ public class QuorumPeer extends Thread i
return tick;
}
+ public QuorumVerifier configFromString(String s) throws IOException, ConfigException{
+ Properties props = new Properties();
+ props.load(new StringReader(s));
+
+ QuorumPeerConfig config = new QuorumPeerConfig();
+ config.parseDynamicConfig(props, electionType);
+
+ return config.getQuorumVerifier();
+ }
+
/**
* Return QuorumVerifier object
*/
- public QuorumVerifier getQuorumVerifier(){
- return quorumConfig;
+ public synchronized QuorumVerifier getQuorumVerifier(){
+ return quorumVerifier;
}
- public void setQuorumVerifier(QuorumVerifier quorumConfig){
- this.quorumConfig = quorumConfig;
- }
+ public synchronized QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk){
+ if ((quorumVerifier != null) && (quorumVerifier.getVersion() >= qv.getVersion())) {
+ LOG.warn("setQuorumVerifier called with stale config " + qv.getVersion() +
+ ". Current version: " + quorumVerifier.getVersion());
+ return quorumVerifier;
+ }
+ QuorumVerifier prevQV = quorumVerifier;
+ quorumVerifier = qv;
+
+ if (writeToDisk) {
+ try {
+ QuorumPeerConfig.writeDynamicConfig(dynamicConfigFilename, configFilename, configBackwardCompatibility, qv);
+ if (configBackwardCompatibility) {
+ dynamicConfigFilename = configFilename + ".dynamic";
+ configBackwardCompatibility = false;
+ }
+ } catch(IOException e){
+ LOG.error("Error closing file: ", e.getMessage());
+ }
+ }
+ QuorumServer qs = qv.getAllMembers().get(getId());
+ if (qs!=null){
+ setQuorumAddress(qs.addr);
+ setElectionAddress(qs.electionAddr);
+ setClientAddress(qs.clientAddr);
+ }
+ return prevQV;
+ }
/**
* Get an instance of LeaderElection
*/
@@ -1021,10 +1225,6 @@ public class QuorumPeer extends Thread i
this.cnxnFactory = cnxnFactory;
}
- public void setQuorumPeers(Map<Long,QuorumServer> quorumPeers) {
- this.quorumPeers = quorumPeers;
- }
-
public int getClientPort() {
return cnxnFactory.getLocalPort();
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Sun Apr 22 23:20:13 2012
@@ -19,15 +19,15 @@
package org.apache.zookeeper.server.quorum;
import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.FileReader;
+import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
@@ -35,6 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
+
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -43,12 +44,16 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.VerifyingFileFactory;
+
public class QuorumPeerConfig {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerConfig.class);
protected InetSocketAddress clientPortAddress;
protected File dataDir;
protected File dataLogDir;
+ protected boolean configBackwardCompatibilityMode = false;
+ protected String dynamicConfigFileStr = null;
+ protected String configFileStr = null;
protected int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME;
protected int maxClientCnxns = 60;
/** defaults to -1 if not set explicitly */
@@ -60,16 +65,10 @@ public class QuorumPeerConfig {
protected int syncLimit;
protected int electionAlg = 3;
protected int electionPort = 2182;
- protected final HashMap<Long,QuorumServer> servers =
- new HashMap<Long, QuorumServer>();
- protected final HashMap<Long,QuorumServer> observers =
- new HashMap<Long, QuorumServer>();
protected long serverId;
- protected HashMap<Long, Long> serverWeight = new HashMap<Long, Long>();
- protected HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
- protected int numGroups = 0;
- protected QuorumVerifier quorumVerifier;
+
+ protected QuorumVerifier quorumVerifier = null;
protected int snapRetainCount = 3;
protected int purgeInterval = 0;
@@ -98,13 +97,13 @@ public class QuorumPeerConfig {
*/
public void parse(String path) throws ConfigException {
LOG.info("Reading configuration from: " + path);
-
+
try {
File configFile = (new VerifyingFileFactory.Builder(LOG)
.warnForRelativePath()
.failForNonExistingPath()
.build()).create(path);
-
+
Properties cfg = new Properties();
FileInputStream in = new FileInputStream(configFile);
try {
@@ -112,12 +111,41 @@ public class QuorumPeerConfig {
} finally {
in.close();
}
-
+
parseProperties(cfg);
+
+ // backward compatibility - dynamic configuration in the same file as static configuration params
+ // see writeDynamicConfig() - we change the config file to new format if reconfig happens
+ if (dynamicConfigFileStr == null) {
+ configBackwardCompatibilityMode = true;
+ configFileStr = path;
+ parseDynamicConfig(cfg, electionAlg);
+ checkValidity();
+ }
+
} catch (IOException e) {
throw new ConfigException("Error processing " + path, e);
} catch (IllegalArgumentException e) {
throw new ConfigException("Error processing " + path, e);
+ }
+
+ if (dynamicConfigFileStr!=null) {
+ try {
+ Properties dynamicCfg = new Properties();
+ FileInputStream inConfig = new FileInputStream(dynamicConfigFileStr);
+ try {
+ dynamicCfg.load(inConfig);
+ } finally {
+ inConfig.close();
+ }
+ parseDynamicConfig(dynamicCfg, electionAlg);
+ checkValidity();
+
+ } catch (IOException e) {
+ throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
+ } catch (IllegalArgumentException e) {
+ throw new ConfigException("Error processing " + dynamicConfigFileStr, e);
+ }
}
}
@@ -166,63 +194,14 @@ public class QuorumPeerConfig {
{
throw new ConfigException("Unrecognised peertype: " + value);
}
+ } else if (key.equals("dynamicConfigFile")){
+ dynamicConfigFileStr = value;
} else if (key.equals("autopurge.snapRetainCount")) {
snapRetainCount = Integer.parseInt(value);
} else if (key.equals("autopurge.purgeInterval")) {
purgeInterval = Integer.parseInt(value);
- } else if (key.startsWith("server.")) {
- int dot = key.indexOf('.');
- long sid = Long.parseLong(key.substring(dot + 1));
- String parts[] = value.split(":");
- if ((parts.length != 2) && (parts.length != 3) && (parts.length !=4)) {
- LOG.error(value
- + " does not have the form host:port or host:port:port " +
- " or host:port:port:type");
- }
- InetSocketAddress addr = new InetSocketAddress(parts[0],
- Integer.parseInt(parts[1]));
- if (parts.length == 2) {
- servers.put(Long.valueOf(sid), new QuorumServer(sid, addr));
- } else if (parts.length == 3) {
- InetSocketAddress electionAddr = new InetSocketAddress(
- parts[0], Integer.parseInt(parts[2]));
- servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
- electionAddr));
- } else if (parts.length == 4) {
- InetSocketAddress electionAddr = new InetSocketAddress(
- parts[0], Integer.parseInt(parts[2]));
- LearnerType type = LearnerType.PARTICIPANT;
- if (parts[3].equalsIgnoreCase("observer")) {
- type = LearnerType.OBSERVER;
- observers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
- electionAddr,type));
- } else if (parts[3].equalsIgnoreCase("participant")) {
- type = LearnerType.PARTICIPANT;
- servers.put(Long.valueOf(sid), new QuorumServer(sid, addr,
- electionAddr,type));
- } else {
- throw new ConfigException("Unrecognised peertype: " + value);
- }
- }
- } else if (key.startsWith("group")) {
- int dot = key.indexOf('.');
- long gid = Long.parseLong(key.substring(dot + 1));
-
- numGroups++;
-
- String parts[] = value.split(":");
- for(String s : parts){
- long sid = Long.parseLong(s);
- if(serverGroup.containsKey(sid))
- throw new ConfigException("Server " + sid + "is in multiple groups");
- else
- serverGroup.put(sid, gid);
- }
-
- } else if(key.startsWith("weight")) {
- int dot = key.indexOf('.');
- long sid = Long.parseLong(key.substring(dot + 1));
- serverWeight.put(sid, Long.parseLong(value));
+ } else if ((key.startsWith("server.") || key.startsWith("group") || key.startsWith("weight")) && zkProp.entrySet().contains("dynamicConfigFile")){
+ throw new ConfigException("parameter: " + key + " must be in a separate dynamic config file");
} else {
System.setProperty("zookeeper." + key, value);
}
@@ -248,97 +227,197 @@ public class QuorumPeerConfig {
+ " is missing.");
}
}
- if (clientPort == 0) {
- throw new IllegalArgumentException("clientPort is not set");
- }
- if (clientPortAddress == null) {
- this.clientPortAddress = new InetSocketAddress(clientPort);
- } else {
- this.clientPortAddress = new InetSocketAddress(
- InetAddress.getByName(clientPortAddress), clientPort);
- }
-
+ if (clientPortAddress != null) {
+ if (clientPort == 0) {
+ throw new IllegalArgumentException("clientPortAddress is set but clientPort is not set");
+ }
+ this.clientPortAddress = new InetSocketAddress(
+ InetAddress.getByName(clientPortAddress), clientPort);
+ } else if (clientPort!=0){
+ this.clientPortAddress = new InetSocketAddress(clientPort);
+ }
if (tickTime == 0) {
throw new IllegalArgumentException("tickTime is not set");
}
if (minSessionTimeout > maxSessionTimeout) {
throw new IllegalArgumentException(
"minSessionTimeout must not be larger than maxSessionTimeout");
- }
- if (servers.size() == 0) {
- if (observers.size() > 0) {
+ }
+ }
+
+ /**
+ * Writes dynamic configuration file, updates static config file if needed.
+ * @param dynamicConfigFilename
+ * @param configFileStr
+ * @param configBackwardCompatibilityMode
+ * @param qv
+ */
+ public static void writeDynamicConfig(String dynamicConfigFilename, String configFileStr,
+ boolean configBackwardCompatibilityMode, QuorumVerifier qv) throws IOException {
+ FileOutputStream outConfig = null;
+ try {
+ byte b[] = qv.toByteArray();
+ if (configBackwardCompatibilityMode) {
+ dynamicConfigFilename = configFileStr + ".dynamic";
+ }
+ String tmpFilename = dynamicConfigFilename + ".tmp";
+ outConfig = new FileOutputStream(tmpFilename);
+
+ outConfig.write(b);
+ outConfig.close();
+ File curFile = new File(dynamicConfigFilename);
+ File tmpFile = new File(tmpFilename);
+ if (!tmpFile.renameTo(curFile)) {
+ throw new IOException("renaming " + tmpFile.toString() + " to " + curFile.toString() + " failed!");
+ }
+ } finally{
+ if (outConfig!=null) {
+ outConfig.close();
+ }
+ }
+ // the following is for users who run without a dynamic config file (old config file)
+ // if the configuration changes (reconfiguration executes), we create a dynamic config
+ // file, remove all the dynamic definitions from the config file and add a pointer
+ // to the config file. The dynamic config file's name will be the same as the config file's
+ // with ".dynamic" appended to it
+
+ if (configBackwardCompatibilityMode) {
+ BufferedWriter out = null;
+ try {
+ File configFile = (new VerifyingFileFactory.Builder(LOG)
+ .warnForRelativePath()
+ .failForNonExistingPath()
+ .build()).create(configFileStr);
+
+ Properties cfg = new Properties();
+ FileInputStream in = new FileInputStream(configFile);
+ try {
+ cfg.load(in);
+ } finally {
+ in.close();
+ }
+ String tmpFilename = configFileStr + ".tmp";
+ FileWriter fstream = new FileWriter(tmpFilename);
+ out = new BufferedWriter(fstream);
+
+ for (Entry<Object, Object> entry : cfg.entrySet()) {
+ String key = entry.getKey().toString().trim();
+ String value = entry.getValue().toString().trim();
+ if (!key.startsWith("server.") && !key.startsWith("group")
+ && !key.startsWith("weight") && !key.equals("clientPort") && !key.equals("clientPortAddress")){
+ out.write(key.concat("=").concat(value).concat("\n"));
+ }
+ }
+ out.write("dynamicConfigFile=".concat(dynamicConfigFilename).concat("\n"));
+ out.close();
+ File tmpFile = new File(tmpFilename);
+ if (!tmpFile.renameTo(configFile)) {
+ throw new IOException("renaming " + tmpFile.toString() + " to " + configFile.toString() + " failed!");
+ }
+ } finally{
+ if (out!=null) {
+ out.close();
+ }
+ }
+ }
+ }
+ public static void deleteFile(String filename){
+ File f = new File(filename);
+ if (f.exists()) {
+ try{
+ f.delete();
+ } catch (Exception e) {
+ LOG.warn("deleting " + filename + " failed");
+ }
+ }
+ }
+
+
+ private QuorumVerifier createQuorumVerifier(Properties dynamicConfigProp, boolean isHierarchical) throws ConfigException{
+ if(isHierarchical){
+ return new QuorumHierarchical(dynamicConfigProp);
+ } else {
+ /*
+ * The default QuorumVerifier is QuorumMaj
+ */
+ //LOG.info("Defaulting to majority quorums");
+ return new QuorumMaj(dynamicConfigProp);
+ }
+ }
+
+ /**
+ * Parse dynamic configuration file.
+ * @param zkProp Properties to parse from.
+ * @throws IOException
+ * @throws ConfigException
+ */
+ public void parseDynamicConfig(Properties dynamicConfigProp, int eAlg)
+ throws IOException, ConfigException {
+ boolean isHierarchical = false;
+ for (Entry<Object, Object> entry : dynamicConfigProp.entrySet()) {
+ String key = entry.getKey().toString().trim();
+ if (key.startsWith("group") || key.startsWith("weight")) {
+ isHierarchical = true;
+ } else if (!configBackwardCompatibilityMode && !key.startsWith("server.") && !key.equals("version")){
+ LOG.info(dynamicConfigProp.toString());
+ throw new ConfigException("Unrecognised parameter: " + key);
+ }
+ }
+
+ quorumVerifier = createQuorumVerifier(dynamicConfigProp, isHierarchical);
+
+ int numParticipators = quorumVerifier.getVotingMembers().size();
+ int numObservers = quorumVerifier.getObservingMembers().size();
+ if (numParticipators == 0) {
+ if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o participants is an invalid configuration");
}
// Not a quorum configuration so return immediately - not an error
// case (for b/w compatibility), server will default to standalone
// mode.
return;
- } else if (servers.size() == 1) {
- if (observers.size() > 0) {
+ } else if (numParticipators == 1) {
+ if (numObservers > 0) {
throw new IllegalArgumentException("Observers w/o quorum is an invalid configuration");
}
// HBase currently adds a single server line to the config, for
// b/w compatibility reasons we need to keep this here.
LOG.error("Invalid configuration, only one server specified (ignoring)");
- servers.clear();
- } else if (servers.size() > 1) {
- if (servers.size() == 2) {
+ //servers.clear();
+ } else if (numParticipators > 1) {
+ if (numParticipators == 2) {
LOG.warn("No server failure will be tolerated. " +
"You need at least 3 servers.");
- } else if (servers.size() % 2 == 0) {
+ } else if (numParticipators % 2 == 0) {
LOG.warn("Non-optimial configuration, consider an odd number of servers.");
}
- if (initLimit == 0) {
- throw new IllegalArgumentException("initLimit is not set");
- }
- if (syncLimit == 0) {
- throw new IllegalArgumentException("syncLimit is not set");
- }
/*
* If using FLE, then every server requires a separate election
* port.
- */
- if (electionAlg != 0) {
- for (QuorumServer s : servers.values()) {
- if (s.electionAddr == null)
- throw new IllegalArgumentException(
- "Missing election port for server: " + s.id);
- }
- }
-
- /*
- * Default of quorum config is majority
- */
- if(serverGroup.size() > 0){
- if(servers.size() != serverGroup.size())
- throw new ConfigException("Every server must be in exactly one group");
- /*
- * The deafult weight of a server is 1
- */
- for(QuorumServer s : servers.values()){
- if(!serverWeight.containsKey(s.id))
- serverWeight.put(s.id, (long) 1);
- }
-
- /*
- * Set the quorumVerifier to be QuorumHierarchical
- */
- quorumVerifier = new QuorumHierarchical(numGroups,
- serverWeight, serverGroup);
- } else {
- /*
- * The default QuorumVerifier is QuorumMaj
- */
-
- LOG.info("Defaulting to majority quorums");
- quorumVerifier = new QuorumMaj(servers.size());
- }
+ */
+ if (eAlg != 0) {
+ for (QuorumServer s : quorumVerifier.getVotingMembers().values()) {
+ if (s.electionAddr == null)
+ throw new IllegalArgumentException(
+ "Missing election port for server: " + s.id);
+ }
+ }
+ }
+ }
+
- // Now add observers to servers, once the quorums have been
- // figured out
- servers.putAll(observers);
+ public void checkValidity() throws IOException, ConfigException{
+ if (quorumVerifier.getVotingMembers().size() > 1) {
+ if (initLimit == 0) {
+ throw new IllegalArgumentException("initLimit is not set");
+ }
+ if (syncLimit == 0) {
+ throw new IllegalArgumentException("syncLimit is not set");
+ }
+
+
File myIdFile = new File(dataDir, "myid");
if (!myIdFile.exists()) {
throw new IllegalArgumentException(myIdFile.toString()
@@ -359,8 +438,18 @@ public class QuorumPeerConfig {
+ " is not a number");
}
+ QuorumServer qs = quorumVerifier.getAllMembers().get(serverId);
+ if (clientPortAddress!=null && qs!=null && qs.clientAddr!=null){
+ if ((!clientPortAddress.getAddress().isAnyLocalAddress()
+ && !clientPortAddress.equals(qs.clientAddr)) ||
+ (clientPortAddress.getAddress().isAnyLocalAddress()
+ && clientPortAddress.getPort()!=qs.clientAddr.getPort()))
+ throw new ConfigException("client address for this server (id = " + serverId + ") in static config file is " + clientPortAddress + " is different from client address found in dynamic file: " + qs.clientAddr);
+ }
+ if (qs!=null) clientPortAddress = qs.clientAddr;
+
// Warn about inconsistent peer type
- LearnerType roleByServersList = observers.containsKey(serverId) ? LearnerType.OBSERVER
+ LearnerType roleByServersList = quorumVerifier.getObservingMembers().containsKey(serverId) ? LearnerType.OBSERVER
: LearnerType.PARTICIPANT;
if (roleByServersList != peerType) {
LOG.warn("Peer type from servers list (" + roleByServersList
@@ -369,9 +458,11 @@ public class QuorumPeerConfig {
peerType = roleByServersList;
}
- }
+
+ }
+
}
-
+
public InetSocketAddress getClientPortAddress() { return clientPortAddress; }
public File getDataDir() { return dataDir; }
public File getDataLogDir() { return dataLogDir; }
@@ -397,15 +488,24 @@ public class QuorumPeerConfig {
return quorumVerifier;
}
- public Map<Long,QuorumServer> getServers() {
- return Collections.unmodifiableMap(servers);
- }
-
public long getServerId() { return serverId; }
- public boolean isDistributed() { return servers.size() > 1; }
+ public boolean isDistributed() { return (quorumVerifier!=null && quorumVerifier.getVotingMembers().size() > 1); }
public LearnerType getPeerType() {
return peerType;
}
+
+ public String getDynamicConfigFilename() {
+ return dynamicConfigFileStr;
+ }
+
+ public String getConfigFilename(){
+ return configFileStr;
+ }
+
+ public boolean getConfigBackwardCompatibility(){
+ return configBackwardCompatibilityMode;
+ }
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Sun Apr 22 23:20:13 2012
@@ -110,8 +110,8 @@ public class QuorumPeerMain {
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
-
- if (args.length == 1 && config.servers.size() > 0) {
+
+ if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
@@ -134,12 +134,11 @@ public class QuorumPeerMain {
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
- quorumPeer = new QuorumPeer();
- quorumPeer.setClientPortAddress(config.getClientPortAddress());
+ quorumPeer = new QuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
- quorumPeer.setQuorumPeers(config.getServers());
+ //quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
@@ -147,11 +146,14 @@ public class QuorumPeerMain {
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
- quorumPeer.setQuorumVerifier(config.getQuorumVerifier());
- quorumPeer.setCnxnFactory(cnxnFactory);
+ quorumPeer.setDynamicConfigFilename(config.getDynamicConfigFilename());
+ quorumPeer.setConfigFileName(config.getConfigFilename());
+ quorumPeer.setConfigBackwardCompatibility(config.getConfigBackwardCompatibility());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
+ quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
+ quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setLearnerType(config.getPeerType());
-
+
quorumPeer.start();
quorumPeer.join();
} catch (InterruptedException e) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Sun Apr 22 23:20:13 2012
@@ -49,10 +49,9 @@ public abstract class QuorumZooKeeperSer
pwriter.print("electionAlg=");
pwriter.println(self.getElectionType());
pwriter.print("electionPort=");
- pwriter.println(self.quorumPeers.get(self.getId()).electionAddr
- .getPort());
+ pwriter.println(self.getElectionAddress().getPort());
pwriter.print("quorumPort=");
- pwriter.println(self.quorumPeers.get(self.getId()).addr.getPort());
+ pwriter.println(self.getQuorumAddress().getPort());
pwriter.print("peerType=");
pwriter.println(self.getLearnerType().ordinal());
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumHierarchical.java Sun Apr 22 23:20:13 2012
@@ -21,8 +21,10 @@ package org.apache.zookeeper.server.quor
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.util.HashSet;
+import java.io.StringWriter;
+import java.util.Set;
import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
import java.util.Map.Entry;
@@ -30,6 +32,8 @@ import java.util.Map.Entry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
@@ -65,12 +69,53 @@ import org.apache.zookeeper.server.quoru
public class QuorumHierarchical implements QuorumVerifier {
private static final Logger LOG = LoggerFactory.getLogger(QuorumHierarchical.class);
- HashMap<Long, Long> serverWeight;
- HashMap<Long, Long> serverGroup;
- HashMap<Long, Long> groupWeight;
+ private HashMap<Long, Long> serverWeight = new HashMap<Long, Long>();
+ private HashMap<Long, Long> serverGroup = new HashMap<Long, Long>();
+ private HashMap<Long, Long> groupWeight = new HashMap<Long, Long>();
- int numGroups;
+ private int numGroups = 0;
+ private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
+ private Map<Long, QuorumServer> participatingMembers = new HashMap<Long, QuorumServer>();
+ private Map<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
+
+ private long version = 0;
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+
+ public boolean equals(Object o){
+ if (!(o instanceof QuorumHierarchical)) {
+ return false;
+ }
+ QuorumHierarchical qm = (QuorumHierarchical)o;
+ if (qm.getVersion() == version) return true;
+ if ((allMembers.size()!=qm.getAllMembers().size()) ||
+ (serverWeight.size() != qm.serverWeight.size()) ||
+ (groupWeight.size() != qm.groupWeight.size()) ||
+ (serverGroup.size() != qm.serverGroup.size())) {
+ return false;
+ }
+ for (QuorumServer qs: allMembers.values()){
+ QuorumServer qso = qm.getAllMembers().get(qs.id);
+ if (qso == null || !qs.equals(qso)) return false;
+ }
+ for (Long sid: serverWeight.keySet()){
+ if (serverWeight.get(sid) != qm.serverWeight.get(sid))
+ return false;
+ }
+ for (Long sid: groupWeight.keySet()){
+ if (groupWeight.get(sid) != qm.groupWeight.get(sid))
+ return false;
+ }
+ for (Long sid: serverGroup.keySet()){
+ if (serverGroup.get(sid) != qm.serverGroup.get(sid))
+ return false;
+ }
+ return true;
+ }
/**
* This contructor requires the quorum configuration
* to be declared in a separate file, and it takes the
@@ -78,12 +123,7 @@ public class QuorumHierarchical implemen
*/
public QuorumHierarchical(String filename)
throws ConfigException {
- this.serverWeight = new HashMap<Long, Long>();
- this.serverGroup = new HashMap<Long, Long>();
- this.groupWeight = new HashMap<Long, Long>();
- this.numGroups = 0;
-
- readConfigFile(filename);
+ readConfigFile(filename);
}
/**
@@ -91,40 +131,11 @@ public class QuorumHierarchical implemen
* it in the unit test for this feature.
*/
- public QuorumHierarchical(Properties qp)
- throws ConfigException {
- this.serverWeight = new HashMap<Long, Long>();
- this.serverGroup = new HashMap<Long, Long>();
- this.groupWeight = new HashMap<Long, Long>();
- this.numGroups = 0;
-
+ public QuorumHierarchical(Properties qp) throws ConfigException {
parse(qp);
-
LOG.info(serverWeight.size() + ", " + serverGroup.size() + ", " + groupWeight.size());
}
-
- /**
- * This contructor takes the two hash maps needed to enable
- * validating quorums. We use it with QuorumPeerConfig. That is,
- * we declare weights and groups in the server configuration
- * file along with the other parameters.
- * @param numGroups
- * @param serverWeight
- * @param serverGroup
- */
- public QuorumHierarchical(int numGroups,
- HashMap<Long, Long> serverWeight,
- HashMap<Long, Long> serverGroup)
- {
- this.serverWeight = serverWeight;
- this.serverGroup = serverGroup;
- this.groupWeight = new HashMap<Long, Long>();
-
- this.numGroups = numGroups;
- computeGroupWeight();
- }
-
-
+
/**
* Returns the weight of a server.
*
@@ -170,12 +181,25 @@ public class QuorumHierarchical implemen
/**
* Parse properties if configuration given in a separate file.
+ * Assumes that allMembers has been already assigned
+ * @throws ConfigException
*/
- private void parse(Properties quorumProp){
+ private void parse(Properties quorumProp) throws ConfigException{
for (Entry<Object, Object> entry : quorumProp.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();
- if (key.startsWith("group")) {
+
+ if (key.startsWith("server.")) {
+ int dot = key.indexOf('.');
+ long sid = Long.parseLong(key.substring(dot + 1));
+ QuorumServer qs = new QuorumServer(sid, value);
+ allMembers.put(Long.valueOf(sid), qs);
+ if (qs.type == LearnerType.PARTICIPANT)
+ participatingMembers.put(Long.valueOf(sid), qs);
+ else {
+ observingMembers.put(Long.valueOf(sid), qs);
+ }
+ } else if (key.startsWith("group")) {
int dot = key.indexOf('.');
long gid = Long.parseLong(key.substring(dot + 1));
@@ -184,7 +208,10 @@ public class QuorumHierarchical implemen
String parts[] = value.split(":");
for(String s : parts){
long sid = Long.parseLong(s);
- serverGroup.put(sid, gid);
+ if(serverGroup.containsKey(sid))
+ throw new ConfigException("Server " + sid + "is in multiple groups");
+ else
+ serverGroup.put(sid, gid);
}
@@ -192,12 +219,76 @@ public class QuorumHierarchical implemen
int dot = key.indexOf('.');
long sid = Long.parseLong(key.substring(dot + 1));
serverWeight.put(sid, Long.parseLong(value));
- }
+ } else if (key.equals("version")){
+ version = Long.parseLong(value, 16);
+ }
}
+ for (QuorumServer qs: allMembers.values()){
+ Long id = qs.id;
+ if (qs.type == LearnerType.PARTICIPANT){
+ if (!serverGroup.containsKey(id))
+ throw new ConfigException("Server " + id + "is not in a group");
+ if (!serverWeight.containsKey(id))
+ serverWeight.put(id, (long) 1);
+ }
+ }
+
+
computeGroupWeight();
}
+ public Map<Long, QuorumServer> getAllMembers() {
+ return allMembers;
+ }
+ public byte[] toByteArray(){
+ StringWriter sw = new StringWriter();
+
+ for (QuorumServer member: getAllMembers().values()){
+ String key = "server." + member.id;
+ String value = member.toString();
+ sw.append(key);
+ sw.append('=');
+ sw.append(value);
+ sw.append('\n');
+ }
+
+ Map<Long, String> groups = new HashMap<Long, String>();
+ for (Entry<Long, Long> pair: serverGroup.entrySet()) {
+ Long sid = pair.getKey();
+ Long gid = pair.getValue();
+ String str = groups.get(gid);
+ if (str == null) str = sid.toString();
+ else str = str.concat(":").concat(sid.toString());
+ groups.put(gid, str);
+ }
+
+ for (Entry<Long, String> pair: groups.entrySet()) {
+ Long gid = pair.getKey();
+ String key = "group." + gid.toString();
+ String value = pair.getValue();
+ sw.append(key);
+ sw.append('=');
+ sw.append(value);
+ sw.append('\n');
+ }
+
+
+ for (Entry<Long, Long> pair: serverWeight.entrySet()) {
+ Long sid = pair.getKey();
+ String key = "weight." + sid.toString();
+ String value = pair.getValue().toString();
+ sw.append(key);
+ sw.append('=');
+ sw.append(value);
+ sw.append('\n');
+ }
+
+ sw.append("version=" + Long.toHexString(version));
+
+ return sw.toString().getBytes();
+ }
+
/**
* This method pre-computes the weights of groups to speed up processing
* when validating a given set. We compute the weights of groups in
@@ -229,7 +320,7 @@ public class QuorumHierarchical implemen
/**
* Verifies if a given set is a quorum.
*/
- public boolean containsQuorum(HashSet<Long> set){
+ public boolean containsQuorum(Set<Long> set){
HashMap<Long, Long> expansion = new HashMap<Long, Long>();
/*
@@ -240,6 +331,7 @@ public class QuorumHierarchical implemen
for(long sid : set){
Long gid = serverGroup.get(sid);
+ if (gid == null) continue;
if(!expansion.containsKey(gid))
expansion.put(gid, serverWeight.get(sid));
else {
@@ -267,5 +359,16 @@ public class QuorumHierarchical implemen
LOG.debug("Negative set size: " + set.size());
return false;
}
- }
+ }
+ public Map<Long, QuorumServer> getVotingMembers() {
+ return participatingMembers;
+ }
+
+ public Map<Long, QuorumServer> getObservingMembers() {
+ return observingMembers;
+ }
+
+ public long getVersion() {
+ return version;
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumMaj.java Sun Apr 22 23:20:13 2012
@@ -18,40 +18,136 @@
package org.apache.zookeeper.server.quorum.flexible;
-import java.util.HashSet;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Map.Entry;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
/**
- * This class implements a validator for majority quorums. The
- * implementation is straightforward.
- *
+ * This class implements a validator for majority quorums. The implementation is
+ * straightforward.
+ *
*/
public class QuorumMaj implements QuorumVerifier {
- int half;
-
+ private Map<Long, QuorumServer> allMembers = new HashMap<Long, QuorumServer>();
+ private HashMap<Long, QuorumServer> votingMembers = new HashMap<Long, QuorumServer>();
+ private HashMap<Long, QuorumServer> observingMembers = new HashMap<Long, QuorumServer>();
+ private long version = 0;
+ private int half;
+
+ public int hashCode() {
+ assert false : "hashCode not designed";
+ return 42; // any arbitrary constant will do
+ }
+
+ public boolean equals(Object o) {
+ if (!(o instanceof QuorumMaj)) {
+ return false;
+ }
+ QuorumMaj qm = (QuorumMaj) o;
+ if (qm.getVersion() == version)
+ return true;
+ if (allMembers.size() != qm.getAllMembers().size())
+ return false;
+ for (QuorumServer qs : allMembers.values()) {
+ QuorumServer qso = qm.getAllMembers().get(qs.id);
+ if (qso == null || !qs.equals(qso))
+ return false;
+ }
+ return true;
+ }
+
/**
* Defines a majority to avoid computing it every time.
*
- * @param n number of servers
*/
- public QuorumMaj(int n){
- this.half = n/2;
+ public QuorumMaj(Map<Long, QuorumServer> allMembers) {
+ this.allMembers = allMembers;
+ for (QuorumServer qs : allMembers.values()) {
+ if (qs.type == LearnerType.PARTICIPANT) {
+ votingMembers.put(Long.valueOf(qs.id), qs);
+ } else {
+ observingMembers.put(Long.valueOf(qs.id), qs);
+ }
+ }
+ half = votingMembers.size() / 2;
}
-
+
+ public QuorumMaj(Properties props) throws ConfigException {
+ for (Entry<Object, Object> entry : props.entrySet()) {
+ String key = entry.getKey().toString();
+ String value = entry.getValue().toString();
+
+ if (key.startsWith("server.")) {
+ int dot = key.indexOf('.');
+ long sid = Long.parseLong(key.substring(dot + 1));
+ QuorumServer qs = new QuorumServer(sid, value);
+ allMembers.put(Long.valueOf(sid), qs);
+ if (qs.type == LearnerType.PARTICIPANT)
+ votingMembers.put(Long.valueOf(sid), qs);
+ else {
+ observingMembers.put(Long.valueOf(sid), qs);
+ }
+ } else if (key.equals("version")) {
+ version = Long.parseLong(value, 16);
+ }
+ }
+ half = votingMembers.size() / 2;
+ }
+
/**
* Returns weight of 1 by default.
*
- * @param id
+ * @param id
*/
- public long getWeight(long id){
+ public long getWeight(long id) {
return (long) 1;
}
-
+
+ public byte[] toByteArray() {
+ StringBuilder sw = new StringBuilder();
+
+ for (QuorumServer member : getAllMembers().values()) {
+ String key = "server." + member.id;
+ String value = member.toString();
+ sw.append(key);
+ sw.append('=');
+ sw.append(value);
+ sw.append('\n');
+ }
+ String hexVersion = Long.toHexString(version);
+ sw.append("version=");
+ sw.append(hexVersion);
+ return sw.toString().getBytes();
+ }
+
/**
- * Verifies if a set is a majority.
+ * Verifies if a set is a majority. Assumes that ackSet contains acks only
+ * from votingMembers
*/
- public boolean containsQuorum(HashSet<Long> set){
- return (set.size() > half);
+ public boolean containsQuorum(Set<Long> ackSet) {
+ return (ackSet.size() > half);
+ }
+
+ public Map<Long, QuorumServer> getAllMembers() {
+ return allMembers;
+ }
+
+ public Map<Long, QuorumServer> getVotingMembers() {
+ return votingMembers;
}
-
+
+ public Map<Long, QuorumServer> getObservingMembers() {
+ return observingMembers;
+ }
+
+ public long getVersion() {
+ return version;
+ }
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/flexible/QuorumVerifier.java Sun Apr 22 23:20:13 2012
@@ -18,7 +18,10 @@
package org.apache.zookeeper.server.quorum.flexible;
-import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
/**
* All quorum validators have to implement a method called
@@ -29,5 +32,11 @@ import java.util.HashSet;
public interface QuorumVerifier {
long getWeight(long id);
- boolean containsQuorum(HashSet<Long> set);
+ boolean containsQuorum(Set<Long> set);
+ long getVersion();
+ Map<Long, QuorumServer> getAllMembers();
+ Map<Long, QuorumServer> getVotingMembers();
+ Map<Long, QuorumServer> getObservingMembers();
+ boolean equals(Object o);
+ byte[] toByteArray();
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Sun Apr 22 23:20:13 2012
@@ -63,10 +63,10 @@ public class QuorumPeerMainTest extends
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique();
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -117,9 +117,9 @@ public class QuorumPeerMainTest extends
final int SERVER_COUNT = 3;
final int clientPorts[] = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
- for (int i = 0; i < SERVER_COUNT; i++) {
- clientPorts[i] = PortAssignment.unique();
- sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
+ for(int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
}
String quorumCfgSection = sb.toString();
@@ -351,7 +351,7 @@ public class QuorumPeerMainTest extends
StringBuilder sb = new StringBuilder();
for (int i = 0; i < SERVER_COUNT; i++) {
clientPorts[i] = PortAssignment.unique();
- sb.append("server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique() + "\n");
+ sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+";"+clientPorts[i]+"\n");
}
String quorumCfgSection = sb.toString();
@@ -388,12 +388,13 @@ public class QuorumPeerMainTest extends
try {
final int CLIENT_PORT_QP1 = PortAssignment.unique();
+ final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
- + ":" + PortAssignment.unique();
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=fee.fii.foo.fum:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
@@ -452,12 +453,12 @@ public class QuorumPeerMainTest extends
final int CLIENT_PORT_QP3 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique() + ":observer";
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2
+ + "\nserver.3=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ":observer" + ";" + CLIENT_PORT_QP3;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -529,11 +530,11 @@ public class QuorumPeerMainTest extends
int electionPort1 = PortAssignment.unique();
int electionPort2 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + electionPort1
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
- + ":" + electionPort2;
-
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + electionPort1 + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+ + ":" + electionPort2 + ";" + CLIENT_PORT_QP2;
+
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
q1.start();
@@ -593,10 +594,10 @@ public class QuorumPeerMainTest extends
final int CLIENT_PORT_QP2 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique();
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP2;
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
MainThread q2 = new MainThread(2, CLIENT_PORT_QP2, quorumCfgSection);
@@ -646,10 +647,10 @@ public class QuorumPeerMainTest extends
long maxwait = 3000;
final int CLIENT_PORT_QP1 = PortAssignment.unique();
String quorumCfgSection =
- "server.1=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique()
- + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
- + ":" + PortAssignment.unique();
+ "server.1=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + CLIENT_PORT_QP1
+ + "\nserver.2=127.0.0.1:" + PortAssignment.unique()
+ + ":" + PortAssignment.unique() + ";" + PortAssignment.unique();
MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
q1.start();
// Let the notifications timeout
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Sun Apr 22 23:20:13 2012
@@ -56,14 +56,17 @@ public class QuorumPeerTestBase extends
public static class MainThread implements Runnable {
final File confFile;
+ final File dynamicConfigFile;
volatile TestQPMain main;
public MainThread(int myid, int clientPort, String quorumCfgSection)
throws IOException
{
File tmpDir = ClientBase.createTmpDir();
+ LOG.info("id = " + myid + " tmpDir = " + tmpDir);
confFile = new File(tmpDir, "zoo.cfg");
-
+ dynamicConfigFile = new File(tmpDir, "zoo.dynamic");
+
FileWriter fwriter = new FileWriter(confFile);
fwriter.write("tickTime=4000\n");
fwriter.write("initLimit=10\n");
@@ -76,13 +79,22 @@ public class QuorumPeerTestBase extends
// Convert windows path to UNIX to avoid problems with "\"
String dir = dataDir.toString();
+ String dynamicConfigFilename = dynamicConfigFile.toString();
String osname = java.lang.System.getProperty("os.name");
if (osname.toLowerCase().contains("windows")) {
dir = dir.replace('\\', '/');
+ dynamicConfigFilename = dynamicConfigFilename.replace('\\', '/');
}
fwriter.write("dataDir=" + dir + "\n");
fwriter.write("clientPort=" + clientPort + "\n");
+
+ fwriter.write("dynamicConfigFile=" + dynamicConfigFilename + "\n");
+
+ fwriter.flush();
+ fwriter.close();
+
+ fwriter = new FileWriter(dynamicConfigFile);
fwriter.write(quorumCfgSection + "\n");
fwriter.flush();
fwriter.close();
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java?rev=1328991&r1=1328990&r2=1328991&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java Sun Apr 22 23:20:13 2012
@@ -859,16 +859,18 @@ public class Zab1_0Test {
return new ConversableFollower(peer, zk);
}
- private QuorumPeer createQuorumPeer(File tmpDir) throws IOException,
- FileNotFoundException {
+ private QuorumPeer createQuorumPeer(File tmpDir) throws IOException, FileNotFoundException {
+ HashMap<Long, QuorumServer> peers = new HashMap<Long, QuorumServer>();
QuorumPeer peer = new QuorumPeer();
peer.syncLimit = 2;
peer.initLimit = 2;
peer.tickTime = 2000;
- peer.quorumPeers = new HashMap<Long, QuorumServer>();
- peer.quorumPeers.put(1L, new QuorumServer(0, new InetSocketAddress(33221)));
- peer.quorumPeers.put(1L, new QuorumServer(1, new InetSocketAddress(33223)));
- peer.setQuorumVerifier(new QuorumMaj(3));
+
+ peers.put(0L, new QuorumServer(0, new InetSocketAddress(33221), new InetSocketAddress(33231), new InetSocketAddress(33241)));
+ peers.put(1L, new QuorumServer(1, new InetSocketAddress(33223), new InetSocketAddress(33233), new InetSocketAddress(33243)));
+ peers.put(2L, new QuorumServer(2, new InetSocketAddress(33224), new InetSocketAddress(33234), new InetSocketAddress(33245)));
+
+ peer.setQuorumVerifier(new QuorumMaj(peers), false);
peer.setCnxnFactory(new NullServerCnxnFactory());
File version2 = new File(tmpDir, "version-2");
version2.mkdir();