You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by br...@apache.org on 2013/03/07 07:01:50 UTC
svn commit: r1453693 [3/5] - in /zookeeper/trunk: ./ src/ src/c/
src/c/include/ src/c/src/ src/c/tests/ src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/cli/
src/java/main/org/apache/zookeeper/common/ src/java/main/org/apache/zook...
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxnFactory.java Thu Mar 7 06:01:49 2013
@@ -185,7 +185,8 @@ public class NIOServerCnxnFactory extend
private final RateLogger acceptErrorLogger = new RateLogger(LOG);
private final Collection<SelectorThread> selectorThreads;
private Iterator<SelectorThread> selectorIterator;
-
+ private volatile boolean reconfiguring = false;
+
public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
Set<SelectorThread> selectorThreads) throws IOException {
super("NIOServerCxnFactory.AcceptThread:" + addr);
@@ -212,10 +213,16 @@ public class NIOServerCnxnFactory extend
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
- NIOServerCnxnFactory.this.stop();
+ if (!reconfiguring) {
+ NIOServerCnxnFactory.this.stop();
+ }
LOG.info("accept thread exitted run method");
}
}
+
+ public void setReconfiguring() {
+ reconfiguring = true;
+ }
private void select() {
try {
@@ -678,7 +685,31 @@ public class NIOServerCnxnFactory extend
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
-
+
+ @Override
+ public void reconfigure(InetSocketAddress addr){
+ ServerSocketChannel oldSS = ss;
+ try {
+ this.ss = ServerSocketChannel.open();
+ ss.socket().setReuseAddress(true);
+ LOG.info("binding to port " + addr);
+ ss.socket().bind(addr);
+ ss.configureBlocking(false);
+ acceptThread.setReconfiguring();
+ oldSS.close();
+ acceptThread.wakeupSelector();
+ try {
+ acceptThread.join();
+ } catch (InterruptedException e) {
+ LOG.error("Error joining old acceptThread when reconfiguring client port " + e.getMessage());
+ }
+ acceptThread = new AcceptThread(ss, addr, selectorThreads);
+ acceptThread.start();
+ } catch(IOException e) {
+ LOG.error("Error reconfiguring client port to " + addr + " " + e.getMessage());
+ }
+ }
+
/** {@inheritDoc} */
public int getMaxClientCnxnsPerHost() {
return maxClientCnxns;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxnFactory.java Thu Mar 7 06:01:49 2013
@@ -360,7 +360,16 @@ public class NettyServerCnxnFactory exte
LOG.info("binding to port " + localAddress);
parentChannel = bootstrap.bind(localAddress);
}
-
+
+ public void reconfigure(InetSocketAddress addr)
+ {
+ Channel oldChannel = parentChannel;
+ LOG.info("binding to port " + addr);
+ parentChannel = bootstrap.bind(addr);
+ localAddress = addr;
+ oldChannel.close();
+ }
+
@Override
public void startup(ZooKeeperServer zks) throws IOException,
InterruptedException {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Thu Mar 7 06:01:49 2013
@@ -20,13 +20,17 @@ package org.apache.zookeeper.server;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.StringReader;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
+import java.util.Map;
+import java.util.Properties;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
@@ -46,18 +50,26 @@ import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.common.StringUtils;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.StatPersisted;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.proto.Create2Request;
import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.ReconfigRequest;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.proto.CheckVersionRequest;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
import org.apache.zookeeper.txn.CreateSessionTxn;
import org.apache.zookeeper.txn.CreateTxn;
@@ -481,6 +493,114 @@ public class PrepRequestProcessor extend
nodeRecord.stat.setVersion(newVersion);
addChangeRecord(nodeRecord);
break;
+ case OpCode.reconfig:
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+ ReconfigRequest reconfigRequest = (ReconfigRequest)record;
+ LeaderZooKeeperServer lzks = (LeaderZooKeeperServer)zks;
+ QuorumVerifier lastSeenQV = lzks.self.getLastSeenQuorumVerifier();
+ // check that there's no reconfig in progress
+ if (lastSeenQV.getVersion()!=lzks.self.getQuorumVerifier().getVersion()) {
+ throw new KeeperException.ReconfigInProgress();
+ }
+ long configId = reconfigRequest.getCurConfigId();
+
+ if (configId != -1 && configId!=lzks.self.getLastSeenQuorumVerifier().getVersion()){
+ String msg = "Reconfiguration from version " + configId + " failed -- last seen version is " + lzks.self.getLastSeenQuorumVerifier().getVersion();
+ throw new KeeperException.BadVersionException(msg);
+ }
+
+ String newMembers = reconfigRequest.getNewMembers();
+
+ if (newMembers != null) { //non-incremental membership change
+ LOG.info("Non-incremental reconfig");
+
+ // Input may be delimited by either commas or newlines so convert to common newline separated format
+ newMembers = newMembers.replaceAll(",", "\n");
+
+ try{
+ Properties props = new Properties();
+ props.load(new StringReader(newMembers));
+ QuorumPeerConfig config = new QuorumPeerConfig();
+ config.parseDynamicConfig(props, lzks.self.getElectionType(), true);
+ request.qv = config.getQuorumVerifier();
+ request.qv.setVersion(request.getHdr().getZxid());
+ } catch (IOException e) {
+ throw new KeeperException.BadArgumentsException(e.getMessage());
+ } catch (ConfigException e) {
+ throw new KeeperException.BadArgumentsException(e.getMessage());
+ }
+ } else { //incremental change - must be a majority quorum system
+ LOG.info("Incremental reconfig");
+
+ List<String> joiningServers = null;
+ String joiningServersString = reconfigRequest.getJoiningServers();
+ if (joiningServersString != null)
+ {
+ joiningServers = StringUtils.split(joiningServersString,",");
+ }
+
+ List<String> leavingServers = null;
+ String leavingServersString = reconfigRequest.getLeavingServers();
+ if (leavingServersString != null)
+ {
+ leavingServers = StringUtils.split(leavingServersString, ",");
+ }
+
+ if (!(lastSeenQV instanceof QuorumMaj)) {
+ String msg = "Incremental reconfiguration requested but last configuration seen has a non-majority quorum system";
+ LOG.warn(msg);
+ throw new KeeperException.BadArgumentsException(msg);
+ }
+ Map<Long, QuorumServer> nextServers = new HashMap<Long, QuorumServer>(lastSeenQV.getAllMembers());
+ try {
+ if (leavingServers != null) {
+ for (String leaving: leavingServers){
+ long sid = Long.parseLong(leaving);
+ nextServers.remove(sid);
+ }
+ }
+ if (joiningServers != null) {
+ for (String joiner: joiningServers){
+ // joiner should have the following format: server.x = server_spec;client_spec
+ String[] parts = StringUtils.split(joiner, "=").toArray(new String[0]);
+ if (parts.length != 2) {
+ throw new KeeperException.BadArgumentsException("Wrong format of server string");
+ }
+ // extract server id x from first part of joiner: server.x
+ Long sid = Long.parseLong(parts[0].substring(parts[0].lastIndexOf('.') + 1));
+ QuorumServer qs = new QuorumServer(sid, parts[1]);
+ if (qs.clientAddr == null || qs.electionAddr == null || qs.addr == null) {
+ throw new KeeperException.BadArgumentsException("Wrong format of server string - each server should have 3 ports specified");
+ }
+ nextServers.remove(qs.id);
+ nextServers.put(Long.valueOf(qs.id), qs);
+ }
+ }
+ } catch (ConfigException e){
+ throw new KeeperException.BadArgumentsException("Reconfiguration failed");
+ }
+ request.qv = new QuorumMaj(nextServers);
+ request.qv.setVersion(request.getHdr().getZxid());
+ }
+ if (request.qv.getVotingMembers().size() < 2){
+ String msg = "Reconfig failed - new configuration must include at least 2 followers";
+ LOG.warn(msg);
+ throw new KeeperException.BadArgumentsException(msg);
+ }
+
+ if (!lzks.getLeader().isQuorumSynced(request.qv)) {
+ String msg2 = "Reconfig failed - there must be a connected and synced quorum in new configuration";
+ LOG.warn(msg2);
+ throw new KeeperException.NewConfigNoQuorum();
+ }
+
+ nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
+ checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
+ request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1));
+ nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
+ nodeRecord.stat.setVersion(-1);
+ addChangeRecord(nodeRecord);
+ break;
case OpCode.setACL:
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
SetACLRequest setAclRequest = (SetACLRequest)record;
@@ -585,6 +705,11 @@ public class PrepRequestProcessor extend
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
break;
+ case OpCode.reconfig:
+ ReconfigRequest reconfigRequest = new ReconfigRequest();
+ ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
+ pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
+ break;
case OpCode.setACL:
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Thu Mar 7 06:01:49 2013
@@ -8,9 +8,9 @@
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *uuuuu
* Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
@@ -25,6 +25,7 @@ import org.apache.jute.Record;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.txn.TxnHeader;
/**
@@ -80,6 +81,8 @@ public class Request {
private KeeperException e;
+ public QuorumVerifier qv = null;
+
public Object getOwner() {
return owner;
}
@@ -133,6 +136,7 @@ public class Request {
case OpCode.ping:
case OpCode.closeSession:
case OpCode.setWatches:
+ case OpCode.reconfig:
return true;
default:
return false;
@@ -157,6 +161,7 @@ public class Request {
case OpCode.setData:
case OpCode.check:
case OpCode.multi:
+ case OpCode.reconfig:
return true;
default:
return false;
@@ -203,6 +208,8 @@ public class Request {
return "closeSession";
case OpCode.error:
return "error";
+ case OpCode.reconfig:
+ return "reconfig";
default:
return "unknown " + op;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxnFactory.java Thu Mar 7 06:01:49 2013
@@ -65,7 +65,10 @@ public abstract class ServerCnxnFactory
public abstract void configure(InetSocketAddress addr,
int maxClientCnxns) throws IOException;
-
+
+ public abstract void reconfigure(InetSocketAddress addr);
+
+
protected SaslServerCallbackHandler saslServerCallbackHandler;
public Login login;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Thu Mar 7 06:01:49 2013
@@ -61,6 +61,8 @@ public class TraceFormatter {
return "closeSession";
case OpCode.error:
return "error";
+ case OpCode.reconfig:
+ return "reconfig";
default:
return "unknown " + op;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZKDatabase.java Thu Mar 7 06:01:49 2013
@@ -39,6 +39,7 @@ import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
@@ -47,6 +48,7 @@ import org.apache.zookeeper.server.persi
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.Leader.Proposal;
import org.apache.zookeeper.server.quorum.QuorumPacket;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.txn.TxnHeader;
@@ -487,4 +489,18 @@ public class ZKDatabase {
this.snapLog.close();
}
+ public synchronized void initConfigInZKDatabase(QuorumVerifier qv) {
+ if (qv == null) return; // only happens during tests
+ try {
+ if (this.dataTree.getNode(ZooDefs.CONFIG_NODE) == null) {
+ // should only happen during upgrade
+ LOG.warn("configuration znode missing (hould only happen during upgrade), creating the node");
+ this.dataTree.addConfigNode();
+ }
+ this.dataTree.setData(ZooDefs.CONFIG_NODE, qv.toString().getBytes(), -1, qv.getVersion(), System.currentTimeMillis());
+ } catch (NoNodeException e) {
+ System.out.println("configuration node missing - should not happen");
+ }
+ }
+
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Thu Mar 7 06:01:49 2013
@@ -131,6 +131,7 @@ public class CommitProcessor extends Thr
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
+ case OpCode.reconfig:
case OpCode.multi:
case OpCode.setACL:
case OpCode.createSession:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java Thu Mar 7 06:01:49 2013
@@ -30,8 +30,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
-import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.ZxidUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -108,13 +109,16 @@ public class FastLeaderElection implemen
* Address of sender
*/
long sid;
-
+
+ QuorumVerifier qv;
/*
* epoch of the proposed leader
*/
long peerEpoch;
}
-
+
+ static byte[] dummyData = new byte[0];
+
/**
* Messages that a peer wants to send to other peers.
* These messages can be both Notifications and Acks
@@ -129,14 +133,17 @@ public class FastLeaderElection implemen
long electionEpoch,
ServerState state,
long sid,
- long peerEpoch) {
+ long peerEpoch,
+ byte[] configData) {
+
this.leader = leader;
this.zxid = zxid;
this.electionEpoch = electionEpoch;
this.state = state;
this.sid = sid;
- this.peerEpoch = peerEpoch;
+ this.peerEpoch = peerEpoch;
+ this.configData = configData;
}
/*
@@ -163,6 +170,11 @@ public class FastLeaderElection implemen
* Address of recipient
*/
long sid;
+
+ /*
+ * Used to send a QuorumVerifier (configuration info)
+ */
+ byte[] configData = dummyData;
/*
* Leader epoch
@@ -204,24 +216,83 @@ public class FastLeaderElection implemen
try{
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if(response == null) continue;
-
+
+ // The current protocol and two previous generations all send at least 28 bytes
+ if (response.buffer.capacity() < 28) {
+ LOG.error("Got a short response: " + response.buffer.capacity());
+ continue;
+ }
+
+ // this is the backwardCompatibility mode in place before ZK-107
+ // It is for a version of the protocol in which we didn't send peer epoch
+ // With peer epoch the message became 36 bytes
+ boolean backCompatibility28 = (response.buffer.capacity() == 28);
+
+ // ZK-107 sends the configuration info in every message.
+ // So messages are 36 bytes + size of configuration info
+ // (variable length, shoulld be at the end of the message).
+ boolean backCompatibility36 = (response.buffer.capacity() == 36);
+
+ response.buffer.clear();
+ int rstate = response.buffer.getInt();
+ long rleader = response.buffer.getLong();
+ long rzxid = response.buffer.getLong();
+ long relectionEpoch = response.buffer.getLong();
+ long rpeerepoch;
+
+ if(!backCompatibility28){
+ rpeerepoch = response.buffer.getLong();
+ } else {
+ if(LOG.isInfoEnabled()){
+ LOG.info("Backward compatibility mode (28 bits), server id: " + response.sid);
+ }
+ rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
+ }
+
+ QuorumVerifier rqv = null;
+
+ // check if we have more than 36 bytes. If so extract config info from message.
+ if(!backCompatibility28 && !backCompatibility36){
+ byte b[] = new byte[response.buffer.remaining()];
+ response.buffer.get(b);
+
+ synchronized(self){
+ try {
+ rqv = self.configFromString(new String(b));
+ if (rqv.getVersion() > self.getQuorumVerifier().getVersion()) {
+ LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion()) + " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion()));
+ self.processReconfig(rqv, null, null, false);
+ LOG.info("restarting leader election");
+ self.shuttingDownLE = true;
+ self.getElectionAlg().shutdown();
+ }
+ } catch (IOException e) {
+ LOG.error("Something went wrong while processing config received from " + response.sid);
+ } catch (ConfigException e) {
+ LOG.error("Something went wrong while processing config received from " + response.sid);
+ }
+ }
+ } else {
+ if(LOG.isInfoEnabled()){
+ LOG.info("Backward compatibility mode (before reconfig), server id: " + response.sid);
+ }
+ }
+
/*
- * If it is from an observer, respond right away.
- * Note that the following predicate assumes that
- * if a server is not a follower, then it must be
- * an observer. If we ever have any other type of
- * learner in the future, we'll have to change the
- * way we check for observers.
+ * If it is from a non-voting server (such as an observer or
+ * a non-voting follower), respond right away.
*/
if(!self.getVotingView().containsKey(response.sid)){
Vote current = self.getCurrentVote();
+ QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
current.getId(),
current.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
- current.getPeerEpoch());
+ current.getPeerEpoch(),
+ qv.toString().getBytes());
sendqueue.offer(notmsg);
} else {
@@ -231,20 +302,9 @@ public class FastLeaderElection implemen
+ self.getId());
}
- /*
- * We check for 28 bytes for backward compatibility
- */
- if (response.buffer.capacity() < 28) {
- LOG.error("Got a short response: "
- + response.buffer.capacity());
- continue;
- }
- boolean backCompatibility = (response.buffer.capacity() == 28);
- response.buffer.clear();
-
// State of peer that sent this message
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
- switch (response.buffer.getInt()) {
+ switch (rstate) {
case 0:
ackstate = QuorumPeer.ServerState.LOOKING;
break;
@@ -261,20 +321,13 @@ public class FastLeaderElection implemen
// Instantiate Notification and set its attributes
Notification n = new Notification();
- n.leader = response.buffer.getLong();
- n.zxid = response.buffer.getLong();
- n.electionEpoch = response.buffer.getLong();
+ n.leader = rleader;
+ n.zxid = rzxid;
+ n.electionEpoch = relectionEpoch;
n.state = ackstate;
- n.sid = response.sid;
- if(!backCompatibility){
- n.peerEpoch = response.buffer.getLong();
- } else {
- if(LOG.isInfoEnabled()){
- LOG.info("Backward compatibility mode, server id=" + n.sid);
- }
- n.peerEpoch = ZxidUtils.getEpochFromZxid(n.zxid);
- }
-
+ n.sid = response.sid;
+ n.peerEpoch = rpeerepoch;
+ n.qv = rqv;
/*
* Print notification info
*/
@@ -297,13 +350,15 @@ public class FastLeaderElection implemen
if((ackstate == QuorumPeer.ServerState.LOOKING)
&& (n.electionEpoch < logicalclock)){
Vote v = getVote();
+ QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
v.getId(),
v.getZxid(),
logicalclock,
self.getPeerState(),
response.sid,
- v.getPeerEpoch());
+ v.getPeerEpoch(),
+ qv.toString().getBytes());
sendqueue.offer(notmsg);
}
} else {
@@ -318,8 +373,11 @@ public class FastLeaderElection implemen
self.getId() + " recipient=" +
response.sid + " zxid=0x" +
Long.toHexString(current.getZxid()) +
- " leader=" + current.getId());
+ " leader=" + current.getId() + " config version = " +
+ Long.toHexString(self.getQuorumVerifier().getVersion()));
}
+
+ QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(
ToSend.mType.notification,
current.getId(),
@@ -327,13 +385,14 @@ public class FastLeaderElection implemen
logicalclock,
self.getPeerState(),
response.sid,
- current.getPeerEpoch());
+ current.getPeerEpoch(),
+ qv.toString().getBytes());
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
- System.out.println("Interrupted Exception while waiting for new message" +
+ LOG.warn("Interrupted Exception while waiting for new message" +
e.toString());
}
}
@@ -341,6 +400,8 @@ public class FastLeaderElection implemen
}
}
+
+
/**
* This worker simply dequeues a message to send and
@@ -376,7 +437,7 @@ public class FastLeaderElection implemen
* @param m message to send
*/
private void process(ToSend m) {
- byte requestBytes[] = new byte[36];
+ byte requestBytes[] = new byte[36 + m.configData.length];
ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
/*
@@ -389,6 +450,7 @@ public class FastLeaderElection implemen
requestBuffer.putLong(m.zxid);
requestBuffer.putLong(m.electionEpoch);
requestBuffer.putLong(m.peerEpoch);
+ requestBuffer.put(m.configData);
manager.toSend(m.sid, requestBuffer);
@@ -503,22 +565,21 @@ public class FastLeaderElection implemen
messenger.halt();
LOG.debug("FLE is down");
}
-
+
/**
* Send notifications to all peers upon a change in our vote
*/
private void sendNotifications() {
- for (QuorumServer server : self.getVotingView().values()) {
- long sid = server.id;
-
+ for (long sid : self.getAllKnownServerIds()) {
+ QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(ToSend.mType.notification,
proposedLeader,
proposedZxid,
logicalclock,
QuorumPeer.ServerState.LOOKING,
sid,
- proposedEpoch);
+ proposedEpoch, qv.toString().getBytes());
if(LOG.isDebugEnabled()){
LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x" +
Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock) +
@@ -529,15 +590,15 @@ public class FastLeaderElection implemen
}
}
-
private void printNotification(Notification n){
LOG.info("Notification: " + n.leader + " (n.leader), 0x"
+ Long.toHexString(n.zxid) + " (n.zxid), 0x"
+ Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
+ " (n.state), " + n.sid + " (n.sid), 0x"
+ Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
- + self.getPeerState() + " (my state)");
+ + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):""));
}
+
/**
* Check if a pair (server id, zxid) succeeds our
@@ -571,8 +632,7 @@ public class FastLeaderElection implemen
* have sufficient to declare the end of the election round.
*
* @param votes Set of votes
- * @param l Identifier of the vote received last
- * @param zxid zxid of the the vote received last
+ * @param vote Identifier of the vote received last
*/
private boolean termPredicate(
HashMap<Long, Vote> votes,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Follower.java Thu Mar 7 06:01:49 2013
@@ -20,12 +20,16 @@ package org.apache.zookeeper.server.quor
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
-
/**
* This class has the control logic for the Follower.
*/
@@ -68,7 +72,8 @@ public class Follower extends Learner{
try {
connectToLeader(addr);
long newEpochZxid = registerWithLeader(Leader.FOLLOWERINFO);
-
+ if (self.isReconfigStateChange())
+ throw new Exception("learned about role change");
//check to see if the leader zxid is lower than ours
//this should never happen but is just a safety check
long newEpoch = ZxidUtils.getEpochFromZxid(newEpochZxid);
@@ -83,7 +88,7 @@ public class Follower extends Learner{
readPacket(qp);
processPacket(qp);
}
- } catch (IOException e) {
+ } catch (Exception e) {
LOG.warn("Exception when following the leader", e);
try {
sock.close();
@@ -104,12 +109,12 @@ public class Follower extends Learner{
* @param qp
* @throws IOException
*/
- protected void processPacket(QuorumPacket qp) throws IOException{
+ protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
- case Leader.PROPOSAL:
+ case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
if (hdr.getZxid() != lastQueued + 1) {
@@ -119,11 +124,36 @@ public class Follower extends Learner{
+ Long.toHexString(lastQueued + 1));
}
lastQueued = hdr.getZxid();
+
+ if (hdr.getType() == OpCode.reconfig){
+ SetDataTxn setDataTxn = (SetDataTxn) txn;
+ QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
+ self.setLastSeenQuorumVerifier(qv, true);
+ }
+
fzk.logRequest(hdr, txn);
break;
case Leader.COMMIT:
fzk.commit(qp.getZxid());
break;
+
+ case Leader.COMMITANDACTIVATE:
+ // get the new configuration from the request
+ Request request = fzk.pendingTxns.element();
+ SetDataTxn setDataTxn = (SetDataTxn) request.getTxn();
+ QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
+
+ // get new designated leader from (current) leader's message
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
+ boolean majorChange =
+ self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ // commit (writes the new config to ZK tree (/zookeeper/config)
+ fzk.commit(qp.getZxid());
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ break;
case Leader.UPTODATE:
LOG.error("Received an UPTODATE message after Follower started");
break;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Thu Mar 7 06:01:49 2013
@@ -81,6 +81,7 @@ public class FollowerRequestProcessor ex
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
+ case OpCode.reconfig:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Mar 7 06:01:49 2013
@@ -26,6 +26,7 @@ import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.concurrent.ConcurrentLi
import java.util.concurrent.ConcurrentMap;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
@@ -47,6 +49,7 @@ import org.apache.zookeeper.server.util.
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
/**
* This class has the control logic for the Leader.
*/
@@ -61,7 +64,7 @@ public class Leader {
static public class Proposal {
public QuorumPacket packet;
- public HashSet<Long> ackSet = new HashSet<Long>();
+ private ArrayList<QuorumVerifierAcksetPair> qvAcksetPairs = new ArrayList<QuorumVerifierAcksetPair>();
public Request request;
@@ -69,12 +72,55 @@ public class Leader {
public String toString() {
return packet.getType() + ", " + packet.getZxid() + ", " + request;
}
+
+ public void addQuorumVerifier(QuorumVerifier qv) {
+ qvAcksetPairs.add(new QuorumVerifierAcksetPair(qv,
+ new HashSet<Long>(qv.getVotingMembers().size())));
+ }
+
+ public boolean addAck(Long sid) {
+ boolean change = false;
+ for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
+ if (qvAckset.getQuorumVerifier().getVotingMembers().containsKey(sid)) {
+ qvAckset.getAckset().add(sid);
+ change = true;
+ }
+ }
+ return change;
+ }
+
+ public boolean hasAllQuorums() {
+ for (QuorumVerifierAcksetPair qvAckset : qvAcksetPairs) {
+ if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset()))
+ return false;
+ }
+ return true;
+ }
+
+ public static class QuorumVerifierAcksetPair {
+ private final QuorumVerifier _qv;
+ private final HashSet<Long> _ackset;
+
+ public QuorumVerifierAcksetPair(QuorumVerifier qv, HashSet<Long> ackset) {
+ _qv = qv;
+ _ackset = ackset;
+ }
+
+ public QuorumVerifier getQuorumVerifier() {
+ return _qv;
+ }
+
+ public HashSet<Long> getAckset() {
+ return _ackset;
+ }
+ }
}
final LeaderZooKeeperServer zk;
final QuorumPeer self;
+
// the follower acceptor thread
volatile LearnerCnxAcceptor cnxAcceptor = null;
@@ -174,6 +220,27 @@ public class Leader {
}
}
+
+ /**
+ * Returns true if a quorum in qv is connected and synced with the leader
+ * and false otherwise
+ *
+ * @param qv, a QuorumVerifier
+ */
+ public boolean isQuorumSynced(QuorumVerifier qv) {
+ HashSet<Long> ids = new HashSet<Long>();
+ if (qv.getVotingMembers().containsKey(self.getId()))
+ ids.add(self.getId());
+ synchronized (forwardingFollowers) {
+ for (LearnerHandler learnerHandler: forwardingFollowers){
+ if (learnerHandler.synced() && qv.getVotingMembers().containsKey(learnerHandler.getSid())){
+ ids.add(learnerHandler.getSid());
+ }
+ }
+ }
+ return qv.containsQuorum(ids);
+ }
+
private final ServerSocket ss;
Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
@@ -282,7 +349,17 @@ public class Leader {
* This message type informs observers of a committed proposal.
*/
final static int INFORM = 8;
-
+
+ /**
+ * Similar to COMMIT, only for a reconfig operation.
+ */
+ final static int COMMITANDACTIVATE = 9;
+
+ /**
+ * Similar to INFORM, only for a reconfig operation.
+ */
+ final static int INFORMANDACTIVATE = 19;
+
final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
@@ -333,6 +410,9 @@ public class Leader {
long epoch = -1;
boolean waitingForNewEpoch = true;
+ // when a reconfig occurs where the leader is removed or becomes an observer,
+ // it does not commit ops after committing the reconfig
+ boolean allowedToCommit = true;
/**
* This method is main function that is called to lead
*
@@ -369,32 +449,45 @@ public class Leader {
}
newLeaderProposal.packet = new QuorumPacket(NEWLEADER, zk.getZxid(),
- null, null);
+ null, null);
if ((newLeaderProposal.packet.getZxid() & 0xffffffffL) != 0) {
LOG.info("NEWLEADER proposal has Zxid of "
+ Long.toHexString(newLeaderProposal.packet.getZxid()));
}
- outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
- newLeaderProposal.ackSet.add(self.getId());
-
- waitForEpochAck(self.getId(), leaderStateSummary);
- self.setCurrentEpoch(epoch);
+ newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
+ if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
+ newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+ }
+ newLeaderProposal.addAck(self.getId());
+
+ outstandingProposals.put(newLeaderProposal.packet.getZxid(), newLeaderProposal);
+ LOG.debug("put newleader into outstanding proposals");
// We have to get at least a majority of servers in sync with
// us. We do this by waiting for the NEWLEADER packet to get
// acknowledged
- while (!self.getQuorumVerifier().containsQuorum(newLeaderProposal.ackSet)){
+
+ waitForEpochAck(self.getId(), leaderStateSummary);
+ self.setCurrentEpoch(epoch);
+
+ while (!newLeaderProposal.hasAllQuorums()){
//while (newLeaderProposal.ackCount <= self.quorumPeers.size() / 2) {
if (self.tick > self.initLimit) {
// Followers aren't syncing fast enough,
// renounce leadership!
StringBuilder ackToString = new StringBuilder();
- for(Long id : newLeaderProposal.ackSet)
- ackToString.append(id + ": ");
-
- shutdown("Waiting for a quorum of followers, only synced with: " + ackToString);
+
+ for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
+ if (ackToString.length() > 0) ackToString.append('\n');
+ if (!qvAckset.getQuorumVerifier().containsQuorum(qvAckset.getAckset())) {
+ ackToString.append("Configuration " + qvAckset.getQuorumVerifier().getVersion() + ": waiting for a quorum of followers, only synced with: ");
+ for(Long id : qvAckset.getAckset())
+ ackToString.append(id + " ");
+ }
+ }
+ shutdown(ackToString.toString());
HashSet<Long> followerSet = new HashSet<Long>();
for(LearnerHandler f : getLearners()) {
@@ -402,9 +495,15 @@ public class Leader {
followerSet.add(f.getSid());
}
}
-
- if (self.getQuorumVerifier().containsQuorum(followerSet)) {
- //if (followers.size() >= self.quorumPeers.size() / 2) {
+
+ boolean initTicksShouldBeIncreased = true;
+ for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
+ if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
+ initTicksShouldBeIncreased = false;
+ break;
+ }
+ }
+ if (initTicksShouldBeIncreased) {
LOG.warn("Enough followers present. "+
"Perhaps the initTicks need to be increased.");
}
@@ -469,7 +568,7 @@ public class Leader {
//if (!tickSkip && syncedCount < self.quorumPeers.size() / 2) {
// Lost quorum, shutdown
// TODO: message is wrong unless majority quorums used
- shutdown("Only " + syncedSet.size() + " followers, need "
+ shutdown("Only " + syncedSet.size() + " voting followers, need "
+ (self.getVotingView().size() / 2));
// make sure the order is the same!
// the leader goes to looking
@@ -525,15 +624,161 @@ public class Leader {
isShutdown = true;
}
+ /** In a reconfig operation, this method attempts to find the best leader for next configuration.
+ * If the current leader is a voter in the next configuartion, then it remains the leader.
+ * Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as
+ * up-to-date as possible, i.e., acked as many outstanding proposals as possible.
+ *
+ * @param reconfigProposal
+ * @param zxid of the reconfigProposal
+ * @return server if of the designated leader
+ */
+
+ private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
+ //new configuration
+ Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1);
+
+ //check if I'm in the new configuration with the same quorum address -
+ // if so, I'll remain the leader
+ if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&
+ newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){
+ return self.getId();
+ }
+ // start with an initial set of candidates that are voters from new config that
+ // acknowledged the reconfig op (there must be a quorum). Choose one of them as
+ // current leader candidate
+ HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
+ candidates.remove(self.getId()); // if we're here, I shouldn't be the leader
+ long curCandidate = candidates.iterator().next();
+
+ //go over outstanding ops in order, and try to find a candidate that acked the most ops.
+ //this way it will be the most up-to-date and we'll minimize the number of ops that get dropped
+
+ long curZxid = zxid + 1;
+ Proposal p = outstandingProposals.get(curZxid);
+
+ while (p!=null && !candidates.isEmpty()) {
+ for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){
+ //reduce the set of candidates to those that acknowledged p
+ candidates.retainAll(qvAckset.getAckset());
+ //no candidate acked p, return the best candidate found so far
+ if (candidates.isEmpty()) return curCandidate;
+ //update the current candidate, and if it is the only one remaining, return it
+ curCandidate = candidates.iterator().next();
+ if (candidates.size() == 1) return curCandidate;
+ }
+ curZxid++;
+ p = outstandingProposals.get(curZxid);
+ }
+
+ return curCandidate;
+ }
+
+ /**
+ * @return True if committed, otherwise false.
+ * @param a proposal p
+ **/
+ synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
+ // make sure that ops are committed in order. With reconfigurations it is now possible
+ // that different operations wait for different sets of acks, and we still want to enforce
+ // that they are committed in order. Currently we only permit one outstanding reconfiguration
+ // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
+ // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+ // for an operation without getting enough acks for preceding ops. But in the future if multiple
+ // concurrent reconfigs are allowed, this can happen.
+ if (outstandingProposals.containsKey(zxid - 1)) return false;
+
+ // getting a quorum from all necessary configurations
+ if (!p.hasAllQuorums()) {
+ return false;
+ }
+
+ // commit proposals in order
+ if (zxid != lastCommitted+1) {
+ LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
+ + " from " + followerAddr + " not first!");
+ LOG.warn("First is "
+ + (lastCommitted+1));
+ }
+
+ // in order to be committed, a proposal must be accepted by a quorum
+
+ outstandingProposals.remove(zxid);
+
+ if (p.request != null) {
+ toBeApplied.add(p);
+ }
+
+ // We don't commit the new leader proposal
+ if ((zxid & 0xffffffffL) != 0) {
+ if (p.request == null) {
+ LOG.warn("Going to commmit null: " + p);
+ } else if (p.request.getHdr().getType() == OpCode.reconfig) {
+ LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());
+
+ //if this server is voter in new config with the same quorum address,
+ //then it will remain the leader
+ //otherwise an up-to-date follower will be designated as leader. This saves
+ //leader election time, unless the designated leader fails
+ Long designatedLeader = getDesignatedLeader(p, zxid);
+ //LOG.warn("designated leader is: " + designatedLeader);
+
+ QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
+
+ self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+
+ if (designatedLeader != self.getId()) {
+ allowedToCommit = false;
+ }
+
+ // we're sending the designated leader, and if the leader is changing the followers are
+ // responsible for closing the connection - this way we are sure that at least a majority of them
+ // receive the commit message.
+ commitAndActivate(zxid, designatedLeader);
+ informAndActivate(p, designatedLeader);
+ //turnOffFollowers();
+ } else {
+ commit(zxid);
+ inform(p);
+ }
+ zk.commitProcessor.commit(p.request);
+ if(pendingSyncs.containsKey(zxid)){
+ for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
+ sendSync(r);
+ }
+ }
+ } else {
+ lastCommitted = zxid;
+ if(LOG.isInfoEnabled()){
+ LOG.info("Have quorum of supporters; starting up and setting last processed zxid: " + zk.getZxid());
+ }
+ QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
+
+ Long designatedLeader = getDesignatedLeader(p, zxid);
+
+ self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
+ if (designatedLeader != self.getId()) {
+ allowedToCommit = false;
+ }
+ LOG.debug("GOT QUORUM of ACKS FOR NEWLEADER msg " + allowedToCommit);
+ zk.startup();
+ zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
+
+ }
+ return true;
+ }
+
/**
* Keep a count of acks that are received by the leader for a particular
* proposal
*
- * @param zxid
- * the zxid of the proposal sent out
+ * @param zxid, the zxid of the proposal sent out
+ * @param sid, the id of the server that sent the ack
* @param followerAddr
*/
- synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
+ synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
+ if (!allowedToCommit) return; // last op committed was a leader change - from now on
+ // the new leader should commit
if (LOG.isTraceEnabled()) {
LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
for (Proposal p : outstandingProposals.values()) {
@@ -543,7 +788,6 @@ public class Leader {
}
LOG.trace("outstanding proposals all");
}
-
if (outstandingProposals.size() == 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("outstanding is 0");
@@ -564,46 +808,34 @@ public class Leader {
Long.toHexString(zxid), followerAddr);
return;
}
-
- p.ackSet.add(sid);
- if (LOG.isDebugEnabled()) {
+
+ p.addAck(sid);
+ /*if (LOG.isDebugEnabled()) {
LOG.debug("Count for zxid: 0x{} is {}",
Long.toHexString(zxid), p.ackSet.size());
- }
- if (self.getQuorumVerifier().containsQuorum(p.ackSet)){
- if (zxid != lastCommitted+1) {
- LOG.warn("Commiting zxid 0x{} from {} not first!",
- Long.toHexString(zxid), followerAddr);
- LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
- }
- outstandingProposals.remove(zxid);
- if (p.request != null) {
- toBeApplied.add(p);
- }
- // We don't commit the new leader proposal
- if ((zxid & 0xffffffffL) != 0) {
- if (p.request == null) {
- LOG.warn("Going to commmit null request for proposal: {}", p);
- }
- commit(zxid);
- inform(p);
- zk.commitProcessor.commit(p.request);
- if(pendingSyncs.containsKey(zxid)){
- for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
- sendSync(r);
- }
- }
- return;
- } else {
- lastCommitted = zxid;
- LOG.info("Have quorum of supporters; starting up and setting last processed zxid: 0x{}",
- Long.toHexString(zk.getZxid()));
- zk.startup();
- zk.getZKDatabase().setlastProcessedZxid(zk.getZxid());
- }
+ }*/
+
+ boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
+
+ // If p is a reconfiguration, multiple other operations may be ready to be committed,
+ // since operations wait for different sets of acks.
+ // Currently we only permit one outstanding reconfiguration at a time
+ // such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is
+ // pending all wait for a quorum of old and new config, so its not possible to get enough acks
+ // for an operation without getting enough acks for preceding ops. But in the future if multiple
+ // concurrent reconfigs are allowed, this can happen and then we need to check whether some pending
+ // ops may already have enough acks and can be committed, which is what this code does.
+
+ if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){
+ long curZxid = zxid;
+ while (allowedToCommit && hasCommitted && p!=null){
+ curZxid++;
+ p = outstandingProposals.get(curZxid);
+ if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);
+ }
}
}
-
+
static class ToBeAppliedRequestProcessor implements RequestProcessor {
private final RequestProcessor next;
@@ -707,6 +939,20 @@ public class Leader {
sendPacket(qp);
}
+ //commit and send some info
+ public void commitAndActivate(long zxid, long designatedLeader) {
+ synchronized(this){
+ lastCommitted = zxid;
+ }
+
+ byte data[] = new byte[8];
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ buffer.putLong(designatedLeader);
+
+ QuorumPacket qp = new QuorumPacket(Leader.COMMITANDACTIVATE, zxid, data, null);
+ sendPacket(qp);
+ }
+
/**
* Create an inform packet and send it to all observers.
* @param zxid
@@ -718,6 +964,23 @@ public class Leader {
sendObserverPacket(qp);
}
+
+ /**
+ * Create an inform&activate packet and send it to all observers.
+ * @param zxid
+ * @param proposal
+ */
+ public void informAndActivate(Proposal proposal, long designatedLeader) {
+ byte[] proposalData = proposal.packet.getData();
+ byte[] data = new byte[proposalData.length + 8];
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+ buffer.putLong(designatedLeader);
+ buffer.put(proposalData);
+
+ QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null);
+ sendObserverPacket(qp);
+ }
+
long lastProposed;
@@ -771,8 +1034,19 @@ public class Leader {
Proposal p = new Proposal();
p.packet = pp;
- p.request = request;
- synchronized (this) {
+ p.request = request;
+
+ synchronized(this) {
+ p.addQuorumVerifier(self.getQuorumVerifier());
+
+ if (request.getHdr().getType() == OpCode.reconfig){
+ self.setLastSeenQuorumVerifier(request.qv, true);
+ }
+
+ if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
+ p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
@@ -821,6 +1095,7 @@ public class Leader {
*
* @param handler handler of the follower
* @return last proposed zxid
+ * @throws InterruptedException
*/
synchronized public long startForwarding(LearnerHandler handler,
long lastSeenZxid) {
@@ -855,7 +1130,6 @@ public class Leader {
return lastProposed;
}
-
private final HashSet<Long> connectingFollowers = new HashSet<Long>();
public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
synchronized(connectingFollowers) {
@@ -957,6 +1231,8 @@ public class Leader {
return "ACK";
case COMMIT:
return "COMMIT";
+ case COMMITANDACTIVATE:
+ return "COMMITANDACTIVATE";
case PING:
return "PING";
case REVALIDATE:
@@ -965,6 +1241,8 @@ public class Leader {
return "SYNC";
case INFORM:
return "INFORM";
+ case INFORMANDACTIVATE:
+ return "INFORMANDACTIVATE";
default:
return "UNKNOWN";
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Thu Mar 7 06:01:49 2013
@@ -40,12 +40,19 @@ import org.apache.jute.OutputArchive;
import org.apache.jute.Record;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException.NoNodeException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
/**
@@ -262,7 +269,7 @@ public class Learner {
/*
* Add sid to payload
*/
- LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);
+ LearnerInfo li = new LearnerInfo(self.getId(), 0x10000, self.getQuorumVerifier().getVersion());
ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
@@ -309,11 +316,13 @@ public class Learner {
* @throws IOException
* @throws InterruptedException
*/
- protected void syncWithLeader(long newLeaderZxid) throws IOException, InterruptedException{
+ protected void syncWithLeader(long newLeaderZxid) throws Exception{
QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
QuorumPacket qp = new QuorumPacket();
long newEpoch = ZxidUtils.getEpochFromZxid(newLeaderZxid);
+ QuorumVerifier newLeaderQV = null;
+
readPacket(qp);
LinkedList<Long> packetsCommitted = new LinkedList<Long>();
LinkedList<PacketInFlight> packetsNotCommitted = new LinkedList<PacketInFlight>();
@@ -351,6 +360,7 @@ public class Learner {
System.exit(13);
}
+ zk.getZKDatabase().initConfigInZKDatabase(self.getQuorumVerifier());
zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
zk.createSessionTracker();
@@ -376,14 +386,30 @@ public class Learner {
+ Long.toHexString(lastQueued + 1));
}
lastQueued = pif.hdr.getZxid();
+
+ if (pif.hdr.getType() == OpCode.reconfig){
+ SetDataTxn setDataTxn = (SetDataTxn) pif.rec;
+ QuorumVerifier qv = self.configFromString(new String(setDataTxn.getData()));
+ self.setLastSeenQuorumVerifier(qv, true);
+ }
+
packetsNotCommitted.add(pif);
break;
case Leader.COMMIT:
+ case Leader.COMMITANDACTIVATE:
if (!snapshotTaken) {
pif = packetsNotCommitted.peekFirst();
if (pif.hdr.getZxid() != qp.getZxid()) {
LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
} else {
+ if (qp.getType() == Leader.COMMITANDACTIVATE) {
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)pif.rec).getData()));
+ boolean majorChange =
+ self.processReconfig(qv, ByteBuffer.wrap(qp.getData()).getLong(), qp.getZxid(), true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
zk.processTxn(pif.hdr, pif.rec);
packetsNotCommitted.remove();
}
@@ -392,18 +418,53 @@ public class Learner {
}
break;
case Leader.INFORM:
+ case Leader.INFORMANDACTIVATE:
TxnHeader hdr = new TxnHeader();
- Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+ Record txn;
+ if (qp.getType() == Leader.COMMITANDACTIVATE) {
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
+ byte[] remainingdata = new byte[buffer.remaining()];
+ buffer.get(remainingdata);
+ txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)txn).getData()));
+ boolean majorChange =
+ self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ } else {
+ txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
+ }
zk.processTxn(hdr, txn);
- break;
+ break;
case Leader.UPTODATE:
+ LOG.info("Learner received UPTODATE message");
+ if (newLeaderQV!=null) {
+ boolean majorChange =
+ self.processReconfig(newLeaderQV, null, null, true);
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ }
if (!snapshotTaken) { // true for the pre v1.0 case
- zk.takeSnapshot();
+ zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
}
- self.cnxnFactory.setZooKeeperServer(zk);
+ self.cnxnFactory.setZooKeeperServer(zk);
break outerLoop;
- case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+ case Leader.NEWLEADER: // it will be NEWLEADER in v1.0
+ LOG.info("Learner received NEWLEADER message");
+ if (qp.getData()!=null && qp.getData().length > 1) {
+ try {
+ QuorumVerifier qv = self.configFromString(new String(qp.getData()));
+ self.setLastSeenQuorumVerifier(qv, true);
+ newLeaderQV = qv;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
zk.takeSnapshot();
self.setCurrentEpoch(newEpoch);
snapshotTaken = true;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Thu Mar 7 06:01:49 2013
@@ -245,19 +245,24 @@ public class LearnerHandler extends Thre
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}
+
byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
- if (learnerInfoData.length == 8) {
- ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
- this.sid = bbsid.getLong();
- } else {
- LearnerInfo li = new LearnerInfo();
- ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
- this.sid = li.getServerid();
- this.version = li.getProtocolVersion();
- }
+ ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
+ if (learnerInfoData.length >= 8) {
+ this.sid = bbsid.getLong();
+ }
+ if (learnerInfoData.length >= 12) {
+ this.version = bbsid.getInt(); // protocolVersion
+ }
+ if (learnerInfoData.length >= 20) {
+ long configVersion = bbsid.getLong();
+ if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
+ throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
+ }
+ }
} else {
- this.sid = leader.followerCounter.getAndDecrement();
+ this.sid = leader.followerCounter.getAndDecrement();
}
if (leader.self.getView().containsKey(this.sid)) {
@@ -277,6 +282,7 @@ public class LearnerHandler extends Thre
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
+ long newLeaderZxid = ZxidUtils.makeZxid(newEpoch, 0);
if (this.getVersion() < 0x10000) {
// we are going to have to extrapolate the epoch information
@@ -287,7 +293,7 @@ public class LearnerHandler extends Thre
} else {
byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);
- QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
+ QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, newLeaderZxid, ver, null);
oa.writeRecord(newEpochPacket, "packet");
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
@@ -395,19 +401,25 @@ public class LearnerHandler extends Thre
// just let the state transfer happen
LOG.debug("proposals is empty");
}
-
LOG.info("Sending " + Leader.getPacketType(packetToSend));
leaderLastZxid = leader.startForwarding(this, updates);
} finally {
rl.unlock();
}
-
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- ZxidUtils.makeZxid(newEpoch, 0), null, null);
+
+ LOG.debug("Sending NEWLEADER message to " + sid);
+ // the version of this quorumVerifier will be set by leader.lead() in case
+ // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
+ // we got here, so the version was set
+
if (getVersion() < 0x10000) {
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, null, null);
oa.writeRecord(newLeaderQP, "packet");
} else {
+ QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+ newLeaderZxid, leader.self.getLastSeenQuorumVerifier().toString().getBytes(), null);
queuedPackets.add(newLeaderQP);
}
bufferedOutput.flush();
@@ -456,6 +468,7 @@ public class LearnerHandler extends Thre
LOG.error("Next packet was supposed to be an ACK");
return;
}
+ LOG.debug("Received NEWLEADER-ACK message from " + sid);
leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
// now that the ack has been processed expect the syncLimit
@@ -473,6 +486,7 @@ public class LearnerHandler extends Thre
// so we need to mark when the peer can actually start
// using the data
//
+ LOG.debug("Sending UPTODATE message to " + sid);
queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
while (true) {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Observer.java Thu Mar 7 06:01:49 2013
@@ -20,11 +20,19 @@ package org.apache.zookeeper.server.quor
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import org.apache.jute.Record;
+import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.ObserverBean;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
+import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
+import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.apache.zookeeper.server.util.SerializeUtils;
+import org.apache.zookeeper.txn.SetDataTxn;
import org.apache.zookeeper.txn.TxnHeader;
/**
@@ -54,10 +62,9 @@ public class Observer extends Learner{
/**
* the main method called by the observer to observe the leader
- *
- * @throws InterruptedException
+ * @throws Exception
*/
- void observeLeader() throws InterruptedException {
+ void observeLeader() throws Exception {
zk.registerJMX(new ObserverBean(this, zk), self.jmxLocalPeerBean);
try {
@@ -66,7 +73,9 @@ public class Observer extends Learner{
try {
connectToLeader(addr);
long newLeaderZxid = registerWithLeader(Leader.OBSERVERINFO);
-
+ if (self.isReconfigStateChange())
+ throw new Exception("learned about role change");
+
syncWithLeader(newLeaderZxid);
QuorumPacket qp = new QuorumPacket();
while (self.isRunning()) {
@@ -92,9 +101,9 @@ public class Observer extends Learner{
/**
* Controls the response of an observer to the receipt of a quorumpacket
* @param qp
- * @throws IOException
+ * @throws Exception
*/
- protected void processPacket(QuorumPacket qp) throws IOException{
+ protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
@@ -121,6 +130,30 @@ public class Observer extends Learner{
ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
obs.commitRequest(request);
break;
+ case Leader.INFORMANDACTIVATE:
+ hdr = new TxnHeader();
+
+ // get new designated leader from (current) leader's message
+ ByteBuffer buffer = ByteBuffer.wrap(qp.getData());
+ long suggestedLeaderId = buffer.getLong();
+
+ byte[] remainingdata = new byte[buffer.remaining()];
+ buffer.get(remainingdata);
+ txn = SerializeUtils.deserializeTxn(remainingdata, hdr);
+ QuorumVerifier qv = self.configFromString(new String(((SetDataTxn)txn).getData()));
+
+ request = new Request (hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, 0);
+ obs = (ObserverZooKeeperServer)zk;
+
+ boolean majorChange =
+ self.processReconfig(qv, suggestedLeaderId, qp.getZxid(), true);
+
+ obs.commitRequest(request);
+
+ if (majorChange) {
+ throw new Exception("changes proposed in reconfig");
+ }
+ break;
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Thu Mar 7 06:01:49 2013
@@ -89,6 +89,7 @@ public class ObserverRequestProcessor ex
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
+ case OpCode.reconfig:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java?rev=1453693&r1=1453692&r2=1453693&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumBean.java Thu Mar 7 06:01:49 2013
@@ -27,7 +27,7 @@ public class QuorumBean implements Quoru
public QuorumBean(QuorumPeer peer){
this.peer = peer;
- name = "ReplicatedServer_id" + peer.getMyid();
+ name = "ReplicatedServer_id" + peer.getId();
}
public String getName() {