You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by he...@apache.org on 2011/05/20 01:17:49 UTC
svn commit: r1125176 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: henry
Date: Thu May 19 23:17:48 2011
New Revision: 1125176
URL: http://svn.apache.org/viewvc?rev=1125176&view=rev
Log:
ZOOKEEPER-784. Server-side functionality for read-only mode (Sergey Doroshenko via henryr)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/Watcher.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu May 19 23:17:48 2011
@@ -318,6 +318,8 @@ NEW FEATURES:
ZOOKEEPER-1038. Move bookkeeper and hedwig code in subversion (breed)
+ ZOOKEEPER-784. Server-side functionality for read-only mode (Sergey Doroshenko via henryr)
+
Release 3.3.0 - 2010-03-24
Non-backward compatible changes:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxn.java Thu May 19 23:17:48 2011
@@ -29,6 +29,10 @@ import java.util.Random;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.LinkedBlockingQueue;
+import java.io.BufferedReader;
+import java.io.InputStreamReader;
+import java.net.ConnectException;
+import java.net.Socket;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.BinaryOutputArchive;
@@ -136,6 +140,14 @@ public class ClientCnxn {
private byte sessionPasswd[] = new byte[16];
+ /**
+ * If true, the connection is allowed to go to r-o mode. This field's value
+ * is sent, besides other data, during session creation handshake. If the
+ * server on the other side of the wire is partitioned it'll accept
+ * read-only clients only.
+ */
+ private boolean readOnly;
+
final String chrootPath;
final SendThread sendThread;
@@ -155,6 +167,21 @@ public class ClientCnxn {
*/
private final HostProvider hostProvider;
+ /**
+ * Is set to true when a connection to a r/w server is established for the
+ * first time; never changed afterwards.
+ * <p>
+ * Is used to handle situations when client without sessionId connects to a
+ * read-only server. Such client receives "fake" sessionId from read-only
+ * server, but this sessionId is invalid for other servers. So when such
+ * client finds a r/w server, it sends 0 instead of fake sessionId during
+ * connection handshake and establishes new, valid session.
+ * <p>
+ * If this field is false (which implies we haven't seen r/w server before)
+ * then non-zero sessionId is fake, otherwise it is valid.
+ */
+ volatile boolean seenRwServerBefore = false;
+
public long getSessionId() {
return sessionId;
}
@@ -215,8 +242,18 @@ public class ClientCnxn {
WatchRegistration watchRegistration;
- Packet(RequestHeader requestHeader, ReplyHeader replyHeader, Record request,
- Record response, WatchRegistration watchRegistration) {
+ /** Convenience ctor */
+ Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
+ Record request, Record response,
+ WatchRegistration watchRegistration) {
+ this(requestHeader, replyHeader, request, response,
+ watchRegistration, false);
+ }
+
+ Packet(RequestHeader requestHeader, ReplyHeader replyHeader,
+ Record request, Record response,
+ WatchRegistration watchRegistration, boolean readOnly) {
+
this.requestHeader = requestHeader;
this.replyHeader = replyHeader;
this.request = request;
@@ -231,6 +268,8 @@ public class ClientCnxn {
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
+ // append "am-I-allowed-to-be-readonly" flag
+ boa.writeBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
@@ -278,12 +317,16 @@ public class ClientCnxn {
* @param watcher watcher for this connection
* @param clientCnxnSocket
* the socket implementation used (e.g. NIO/Netty)
+ * @param canBeReadOnly
+ * whether the connection is allowed to go to read-only
+ * mode in case of partitioning
* @throws IOException
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
- ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket)
+ ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly)
throws IOException {
- this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0, new byte[16]);
+ this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher,
+ clientCnxnSocket, 0, new byte[16], canBeReadOnly);
}
/**
@@ -303,11 +346,14 @@ public class ClientCnxn {
* the socket implementation used (e.g. NIO/Netty)
* @param sessionId session id if re-establishing session
* @param sessionPasswd session passwd if re-establishing session
+ * @param canBeReadOnly
+ * whether the connection is allowed to go to read-only
+ * mode in case of partitioning
* @throws IOException
*/
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
- long sessionId, byte[] sessionPasswd) {
+ long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -318,6 +364,7 @@ public class ClientCnxn {
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
+ readOnly = canBeReadOnly;
sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();
@@ -614,6 +661,14 @@ public class ClientCnxn {
super(msg);
}
}
+
+ private static class RWServerFoundException extends IOException {
+ private static final long serialVersionUID = 90431199887158758L;
+
+ public RWServerFoundException(String msg) {
+ super(msg);
+ }
+ }
public static final int packetLen = Integer.getInteger("jute.maxbuffer",
4096 * 1024);
@@ -757,10 +812,12 @@ public class ClientCnxn {
void primeConnection() throws IOException {
LOG.info("Socket connection established to "
- + clientCnxnSocket.getRemoteSocketAddress() + ", initiating session");
+ + clientCnxnSocket.getRemoteSocketAddress()
+ + ", initiating session");
isFirstConnect = false;
+ long sessId = (seenRwServerBefore) ? sessionId : 0;
ConnectRequest conReq = new ConnectRequest(0, lastZxid,
- sessionTimeout, sessionId, sessionPasswd);
+ sessionTimeout, sessId, sessionPasswd);
synchronized (outgoingQueue) {
// We add backwards since we are pushing into the front
// Only send if there's a pending watch
@@ -786,8 +843,8 @@ public class ClientCnxn {
OpCode.auth), null, new AuthPacket(0, id.scheme,
id.data), null, null));
}
- outgoingQueue.addFirst((new Packet(null, null, conReq, null,
- null)));
+ outgoingQueue.addFirst(new Packet(null, null, conReq,
+ null, null, readOnly));
}
clientCnxnSocket.enableReadWriteOnly();
if (LOG.isDebugEnabled()) {
@@ -802,16 +859,31 @@ public class ClientCnxn {
queuePacket(h, null, null, null, null, null, null, null, null);
}
+ private InetSocketAddress rwServerAddress = null;
+
+ private final static int minPingRwTimeout = 100;
+
+ private final static int maxPingRwTimeout = 60000;
+
+ private int pingRwTimeout = minPingRwTimeout;
+
private void startConnect() throws IOException {
if(!isFirstConnect){
try {
Thread.sleep(r.nextInt(1000));
- } catch (InterruptedException e1) {
- LOG.warn("Unexpected exception", e1);
+ } catch (InterruptedException e) {
+ LOG.warn("Unexpected exception", e);
}
}
state = States.CONNECTING;
- InetSocketAddress addr = hostProvider.next(1000);
+
+ InetSocketAddress addr;
+ if (rwServerAddress != null) {
+ addr = rwServerAddress;
+ rwServerAddress = null;
+ } else {
+ addr = hostProvider.next(1000);
+ }
LOG.info("Opening socket connection to server " + addr);
@@ -830,6 +902,7 @@ public class ClientCnxn {
clientCnxnSocket.updateNow();
clientCnxnSocket.updateLastSendAndHeard();
int to;
+ long lastPingRwServer = System.currentTimeMillis();
while (state.isAlive()) {
try {
if (!clientCnxnSocket.isConnected()) {
@@ -841,7 +914,7 @@ public class ClientCnxn {
clientCnxnSocket.updateLastSendAndHeard();
}
- if (state == States.CONNECTED) {
+ if (state.isConnected()) {
to = readTimeout - clientCnxnSocket.getIdleRecv();
} else {
to = connectTimeout - clientCnxnSocket.getIdleRecv();
@@ -854,7 +927,7 @@ public class ClientCnxn {
+ " for sessionid 0x"
+ Long.toHexString(sessionId));
}
- if (state == States.CONNECTED) {
+ if (state.isConnected()) {
int timeToNextPing = readTimeout / 2
- clientCnxnSocket.getIdleSend();
if (timeToNextPing <= 0) {
@@ -868,6 +941,20 @@ public class ClientCnxn {
}
}
+ // If we are in read-only mode, seek for read/write server
+ if (state == States.CONNECTEDREADONLY) {
+ long now = System.currentTimeMillis();
+ int idlePingRwServer = (int) (now - lastPingRwServer);
+ if (idlePingRwServer >= pingRwTimeout) {
+ lastPingRwServer = now;
+ idlePingRwServer = 0;
+ pingRwTimeout =
+ Math.min(2*pingRwTimeout, maxPingRwTimeout);
+ pingRwServer();
+ }
+ to = Math.min(to, pingRwTimeout - idlePingRwServer);
+ }
+
clientCnxnSocket.doTransport(to, pendingQueue, outgoingQueue);
} catch (Exception e) {
@@ -887,6 +974,8 @@ public class ClientCnxn {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
} else if (e instanceof EndOfStreamException) {
LOG.info(e.getMessage() + RETRY_CONN_MSG);
+ } else if (e instanceof RWServerFoundException) {
+ LOG.info(e.getMessage());
} else {
LOG.warn(
"Session 0x"
@@ -918,6 +1007,43 @@ public class ClientCnxn {
"SendThread exitedloop.");
}
+ private void pingRwServer() throws RWServerFoundException {
+ String result = null;
+ InetSocketAddress addr = hostProvider.next(0);
+ LOG.info("Checking server " + addr + " for being r/w." +
+ " Timeout " + pingRwTimeout);
+
+ try {
+ Socket sock = new Socket(addr.getHostName(), addr.getPort());
+ sock.setSoLinger(false, -1);
+ sock.setSoTimeout(1000);
+ sock.setTcpNoDelay(true);
+ sock.getOutputStream().write("isro".getBytes());
+ sock.getOutputStream().flush();
+ sock.shutdownOutput();
+ BufferedReader br = new BufferedReader(
+ new InputStreamReader(sock.getInputStream()));
+ result = br.readLine();
+ sock.close();
+ br.close();
+ } catch (ConnectException e) {
+ // ignore, this just means server is not up
+ } catch (IOException e) {
+ // some unexpected error, warn about it
+ LOG.warn("Exception while seeking for r/w server " +
+ e.getMessage(), e);
+ }
+
+ if ("rw".equals(result)) {
+ pingRwTimeout = minPingRwTimeout;
+ // save the found address so that it's used during the next
+ // connection attempt
+ rwServerAddress = addr;
+ throw new RWServerFoundException("Majority server found at "
+ + addr.getHostName() + ":" + addr.getPort());
+ }
+ }
+
private void cleanup() {
clientCnxnSocket.cleanup();
synchronized (pendingQueue) {
@@ -941,10 +1067,11 @@ public class ClientCnxn {
* @param _negotiatedSessionTimeout
* @param _sessionId
* @param _sessionPasswd
+ * @param isRO
* @throws IOException
*/
void onConnected(int _negotiatedSessionTimeout, long _sessionId,
- byte[] _sessionPasswd) throws IOException {
+ byte[] _sessionPasswd, boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
state = States.CLOSED;
@@ -957,19 +1084,27 @@ public class ClientCnxn {
"Unable to reconnect to ZooKeeper service, session 0x"
+ Long.toHexString(sessionId) + " has expired");
}
+ if (!readOnly && isRO) {
+ LOG.error("Read/write client got connected to read-only server");
+ }
readTimeout = negotiatedSessionTimeout * 2 / 3;
connectTimeout = negotiatedSessionTimeout / hostProvider.size();
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
- state = States.CONNECTED;
+ state = (isRO) ?
+ States.CONNECTEDREADONLY : States.CONNECTED;
+ seenRwServerBefore |= !isRO;
LOG.info("Session establishment complete on server "
- + clientCnxnSocket.getRemoteSocketAddress() + ", sessionid = 0x"
- + Long.toHexString(sessionId) + ", negotiated timeout = "
- + negotiatedSessionTimeout);
+ + clientCnxnSocket.getRemoteSocketAddress()
+ + ", sessionid = 0x" + Long.toHexString(sessionId)
+ + ", negotiated timeout = " + negotiatedSessionTimeout
+ + (isRO ? " (READ-ONLY mode)" : ""));
+ KeeperState eventState = (isRO) ?
+ KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
eventThread.queueEvent(new WatchedEvent(
Watcher.Event.EventType.None,
- Watcher.Event.KeeperState.SyncConnected, null));
+ eventState, null));
}
void close() {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocket.java Thu May 19 23:17:48 2011
@@ -128,9 +128,20 @@ abstract class ClientCnxnSocket {
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = new ConnectResponse();
conRsp.deserialize(bbia, "connect");
+
+ // read "is read-only" flag
+ boolean isRO = false;
+ try {
+ isRO = bbia.readBool("readOnly");
+ } catch (IOException e) {
+ // this is ok -- just a packet from an old server which
+ // doesn't contain readOnly field
+ LOG.warn("Connected to an old server; r-o mode will be unavailable");
+ }
+
this.sessionId = conRsp.getSessionId();
sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
- conRsp.getPasswd());
+ conRsp.getPasswd(), isRO);
}
abstract boolean isConnected();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ClientCnxnSocketNIO.java Thu May 19 23:17:48 2011
@@ -273,7 +273,7 @@ public class ClientCnxnSocketNIO extends
}
}
}
- if (sendThread.getZkState() == States.CONNECTED) {
+ if (sendThread.getZkState().isConnected()) {
if (outgoingQueue.size() > 0) {
enableWrite();
} else {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java Thu May 19 23:17:48 2011
@@ -120,6 +120,8 @@ public abstract class KeeperException ex
return new InvalidCallbackException();
case SESSIONMOVED:
return new SessionMovedException();
+ case NOTREADONLY:
+ return new NotReadOnlyException();
case OK:
default:
@@ -332,7 +334,9 @@ public abstract class KeeperException ex
/** Client authentication failed */
AUTHFAILED (AuthFailed),
/** Session moved to another server, so operation is ignored */
- SESSIONMOVED (-118);
+ SESSIONMOVED (-118),
+ /** State-changing request is passed to read-only server */
+ NOTREADONLY (-119);
private static final Map<Integer,Code> lookup
= new HashMap<Integer,Code>();
@@ -407,6 +411,8 @@ public abstract class KeeperException ex
return "Invalid callback";
case SESSIONMOVED:
return "Session moved";
+ case NOTREADONLY:
+ return "Not a read-only call";
default:
return "Unknown error " + code;
}
@@ -643,6 +649,15 @@ public abstract class KeeperException ex
}
/**
+ * @see Code#NOTREADONLY
+ */
+ public static class NotReadOnlyException extends KeeperException {
+ public NotReadOnlyException() {
+ super(Code.NOTREADONLY);
+ }
+ }
+
+ /**
* @see Code#SYSTEMERROR
*/
public static class SystemErrorException extends KeeperException {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/Watcher.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/Watcher.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/Watcher.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/Watcher.java Thu May 19 23:17:48 2011
@@ -54,17 +54,26 @@ public interface Watcher {
* creation). */
SyncConnected (3),
+ /**
+ * Auth failed state
+ */
+ AuthFailed (4),
+
+ /**
+ * The client is connected to a read-only server, that is the
+ * server which is not currently connected to the majority.
+ * The only operations allowed after receiving this state is
+ * read operations.
+ * This state is generated for read-only clients only since
+ * read/write clients aren't allowed to connect to r/o servers.
+ */
+ ConnectedReadOnly (5),
+
/** The serving cluster has expired this session. The ZooKeeper
* client connection (the session) is no longer valid. You must
* create a new client connection (instantiate a new ZooKeeper
* instance) if you with to access the ensemble. */
- Expired (-112),
-
- /**
- * Auth failed state
- *
- */
- AuthFailed(4);
+ Expired (-112);
private final int intValue; // Integer representation of value
// for sending over wire
@@ -84,6 +93,7 @@ public interface Watcher {
case 1: return KeeperState.NoSyncConnected;
case 3: return KeeperState.SyncConnected;
case 4: return KeeperState.AuthFailed;
+ case 5: return KeeperState.ConnectedReadOnly;
case -112: return KeeperState.Expired;
default:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeper.java Thu May 19 23:17:48 2011
@@ -321,11 +321,21 @@ public class ZooKeeper {
}
public enum States {
- CONNECTING, ASSOCIATING, CONNECTED, CLOSED, AUTH_FAILED;
+ CONNECTING, ASSOCIATING, CONNECTED, CONNECTEDREADONLY,
+ CLOSED, AUTH_FAILED;
public boolean isAlive() {
return this != CLOSED && this != AUTH_FAILED;
}
+
+ /**
+ * Returns whether we are connected to a server (which
+ * could possibly be read-only, if this client is allowed
+ * to go to read-only mode)
+ * */
+ public boolean isConnected() {
+ return this == CONNECTED || this == CONNECTEDREADONLY;
+ }
}
protected final ClientCnxn cnxn;
@@ -376,6 +386,64 @@ public class ZooKeeper {
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
+ this(connectString, sessionTimeout, watcher, false);
+ }
+
+ /**
+ * To create a ZooKeeper client object, the application needs to pass a
+ * connection string containing a comma separated list of host:port pairs,
+ * each corresponding to a ZooKeeper server.
+ * <p>
+ * Session establishment is asynchronous. This constructor will initiate
+ * connection to the server and return immediately - potentially (usually)
+ * before the session is fully established. The watcher argument specifies
+ * the watcher that will be notified of any changes in state. This
+ * notification can come at any point before or after the constructor call
+ * has returned.
+ * <p>
+ * The instantiated ZooKeeper client object will pick an arbitrary server
+ * from the connectString and attempt to connect to it. If establishment of
+ * the connection fails, another server in the connect string will be tried
+ * (the order is non-deterministic, as we random shuffle the list), until a
+ * connection is established. The client will continue attempts until the
+ * session is explicitly closed.
+ * <p>
+ * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
+ * connection string. This will run the client commands while interpreting
+ * all paths relative to this root (similar to the unix chroot command).
+ *
+ * @param connectString
+ * comma separated host:port pairs, each corresponding to a zk
+ * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
+ * the optional chroot suffix is used the example would look
+ * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+ * where the client would be rooted at "/app/a" and all paths
+ * would be relative to this root - ie getting/setting/etc...
+ * "/foo/bar" would result in operations being run on
+ * "/app/a/foo/bar" (from the server perspective).
+ * @param sessionTimeout
+ * session timeout in milliseconds
+ * @param watcher
+ * a watcher object which will be notified of state changes, may
+ * also be notified for node events
+ * @param canBeReadOnly
+ * (added in 3.4) whether the created client is allowed to go to
+ * read-only mode in case of partitioning. Read-only mode
+ * basically means that if the client can't find any majority
+ * servers but there's partitioned server it could reach, it
+ * connects to one in read-only mode, i.e. read requests are
+ * allowed while write requests are not. It continues seeking for
+ * majority in the background.
+ *
+ * @throws IOException
+ * in cases of network failure
+ * @throws IllegalArgumentException
+ * if an invalid chroot path is specified
+ */
+ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+ boolean canBeReadOnly)
+ throws IOException
+ {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
@@ -387,7 +455,7 @@ public class ZooKeeper {
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
- getClientCnxnSocket());
+ getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
@@ -447,6 +515,72 @@ public class ZooKeeper {
long sessionId, byte[] sessionPasswd)
throws IOException
{
+ this(connectString, sessionTimeout, watcher, sessionId, sessionPasswd, false);
+ }
+
+ /**
+ * To create a ZooKeeper client object, the application needs to pass a
+ * connection string containing a comma separated list of host:port pairs,
+ * each corresponding to a ZooKeeper server.
+ * <p>
+ * Session establishment is asynchronous. This constructor will initiate
+ * connection to the server and return immediately - potentially (usually)
+ * before the session is fully established. The watcher argument specifies
+ * the watcher that will be notified of any changes in state. This
+ * notification can come at any point before or after the constructor call
+ * has returned.
+ * <p>
+ * The instantiated ZooKeeper client object will pick an arbitrary server
+ * from the connectString and attempt to connect to it. If establishment of
+ * the connection fails, another server in the connect string will be tried
+ * (the order is non-deterministic, as we random shuffle the list), until a
+ * connection is established. The client will continue attempts until the
+ * session is explicitly closed (or the session is expired by the server).
+ * <p>
+ * Added in 3.2.0: An optional "chroot" suffix may also be appended to the
+ * connection string. This will run the client commands while interpreting
+ * all paths relative to this root (similar to the unix chroot command).
+ * <p>
+ * Use {@link #getSessionId} and {@link #getSessionPasswd} on an established
+ * client connection, these values must be passed as sessionId and
+ * sessionPasswd respectively if reconnecting. Otherwise, if not
+ * reconnecting, use the other constructor which does not require these
+ * parameters.
+ *
+ * @param connectString
+ * comma separated host:port pairs, each corresponding to a zk
+ * server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+ * If the optional chroot suffix is used the example would look
+ * like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
+ * where the client would be rooted at "/app/a" and all paths
+ * would be relative to this root - ie getting/setting/etc...
+ * "/foo/bar" would result in operations being run on
+ * "/app/a/foo/bar" (from the server perspective).
+ * @param sessionTimeout
+ * session timeout in milliseconds
+ * @param watcher
+ * a watcher object which will be notified of state changes, may
+ * also be notified for node events
+ * @param sessionId
+ * specific session id to use if reconnecting
+ * @param sessionPasswd
+ * password for this session
+ * @param canBeReadOnly
+ * (added in 3.4) whether the created client is allowed to go to
+ * read-only mode in case of partitioning. Read-only mode
+ * basically means that if the client can't find any majority
+ * servers but there's partitioned server it could reach, it
+ * connects to one in read-only mode, i.e. read requests are
+ * allowed while write requests are not. It continues seeking for
+ * majority in the background.
+ *
+ * @throws IOException in cases of network failure
+ * @throws IllegalArgumentException if an invalid chroot path is specified
+ */
+ public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
+ long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
+ throws IOException
+ {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout
+ " watcher=" + watcher
@@ -462,7 +596,8 @@ public class ZooKeeper {
connectStringParser.getServerAddresses());
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
- getClientCnxnSocket(), sessionId, sessionPasswd);
+ getClientCnxnSocket(), sessionId, sessionPasswd, canBeReadOnly);
+ cnxn.seenRwServerBefore = true; // since user has provided sessionId
cnxn.start();
}
@@ -1591,7 +1726,7 @@ public class ZooKeeper {
public String toString() {
States state = getState();
return ("State:" + state.toString()
- + (state == States.CONNECTED ?
+ + (state.isConnected() ?
" Timeout:" + getSessionTimeout() + " " :
" ")
+ cnxn);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/ZooKeeperMain.java Thu May 19 23:17:48 2011
@@ -195,6 +195,8 @@ public class ZooKeeperMain {
options.put("server", it.next());
} else if (opt.equals("-timeout")) {
options.put("timeout", it.next());
+ } else if (opt.equals("-r")) {
+ options.put("readonly", "true");
}
} catch (NoSuchElementException e){
System.err.println("Error: no argument found for option "
@@ -260,9 +262,10 @@ public class ZooKeeperMain {
zk.close();
}
host = newHost;
+ boolean readOnly = cl.getOption("readonly") != null;
zk = new ZooKeeper(host,
Integer.parseInt(cl.getOption("timeout")),
- new MyWatcher());
+ new MyWatcher(), readOnly);
}
public static void main(String args[])
@@ -592,6 +595,8 @@ public class ZooKeeperMain {
System.err.println("Node already exists: " + e.getPath());
} catch (KeeperException.NotEmptyException e) {
System.err.println("Node not empty: " + e.getPath());
+ } catch (KeeperException.NotReadOnlyException e) {
+ System.err.println("Not a read-only call: " + e.getPath());
}
return false;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java Thu May 19 23:17:48 2011
@@ -50,7 +50,7 @@ import org.apache.zookeeper.proto.Reques
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
-
+import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import com.sun.management.UnixOperatingSystemMXBean;
/**
@@ -644,6 +644,10 @@ public class NIOServerCnxn extends Serve
else {
pw.print("Zookeeper version: ");
pw.println(Version.getFullVersion());
+ if (zkServer instanceof ReadOnlyZooKeeperServer) {
+ pw.println("READ-ONLY mode; serving only " +
+ "read-only clients");
+ }
if (len == statCmd) {
LOG.info("Stat command output");
pw.println("Clients:");
@@ -780,6 +784,23 @@ public class NIOServerCnxn extends Serve
}
+ private class IsroCommand extends CommandThread {
+
+ public IsroCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.print("null");
+ } else if (zkServer instanceof ReadOnlyZooKeeperServer) {
+ pw.print("ro");
+ } else {
+ pw.print("rw");
+ }
+ }
+ }
/** Return if four letter word found and responded to, otw false **/
private boolean checkFourLetterWord(final SelectionKey k, final int len)
@@ -870,6 +891,10 @@ public class NIOServerCnxn extends Serve
MonitorCommand mntr = new MonitorCommand(pwriter);
mntr.start();
return true;
+ } else if (len == isroCmd) {
+ IsroCommand isro = new IsroCommand(pwriter);
+ isro.start();
+ return true;
}
return false;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/NettyServerCnxn.java Thu May 19 23:17:48 2011
@@ -24,7 +24,6 @@ import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
-import java.io.StringWriter;
import java.io.Writer;
import java.lang.management.ManagementFactory;
import java.lang.management.OperatingSystemMXBean;
@@ -49,6 +48,7 @@ import org.apache.zookeeper.proto.ReplyH
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.quorum.Leader;
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
+import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@@ -457,6 +457,10 @@ public class NettyServerCnxn extends Ser
else {
pw.print("Zookeeper version: ");
pw.println(Version.getFullVersion());
+ if (zkServer instanceof ReadOnlyZooKeeperServer) {
+ pw.println("READ-ONLY mode; serving only " +
+ "read-only clients");
+ }
if (len == statCmd) {
LOG.info("Stat command output");
pw.println("Clients:");
@@ -591,6 +595,23 @@ public class NettyServerCnxn extends Ser
}
+ private class IsroCommand extends CommandThread {
+
+ public IsroCommand(PrintWriter pw) {
+ super(pw);
+ }
+
+ @Override
+ public void commandRun() {
+ if (zkServer == null) {
+ pw.print("null");
+ } else if (zkServer instanceof ReadOnlyZooKeeperServer) {
+ pw.print("ro");
+ } else {
+ pw.print("rw");
+ }
+ }
+ }
/** Return if four letter word found and responded to, otw false **/
private boolean checkFourLetterWord(final Channel channel,
@@ -663,6 +684,10 @@ public class NettyServerCnxn extends Ser
MonitorCommand mntr = new MonitorCommand(pwriter);
mntr.start();
return true;
+ } else if (len == isroCmd) {
+ IsroCommand isro = new IsroCommand(pwriter);
+ isro.start();
+ return true;
}
return false;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ServerCnxn.java Thu May 19 23:17:48 2011
@@ -48,11 +48,18 @@ public abstract class ServerCnxn impleme
protected ArrayList<Id> authInfo = new ArrayList<Id>();
+ /**
+ * If the client is of old version, we don't send r-o mode info to it.
+ * The reason is that if we would, old C client doesn't read it, which
+ * results in TCP RST packet, i.e. "connection reset by peer".
+ */
+ boolean isOldClient = true;
+
abstract int getSessionTimeout();
abstract void close();
- abstract void sendResponse(ReplyHeader h, Record r, String tag)
+ public abstract void sendResponse(ReplyHeader h, Record r, String tag)
throws IOException;
/* notify the client the session is closing and close/cleanup socket */
@@ -209,6 +216,13 @@ public abstract class ServerCnxn impleme
protected final static int mntrCmd = ByteBuffer.wrap("mntr".getBytes())
.getInt();
+ /*
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
+ * Zk Admin</a>. this link is for all the commands.
+ */
+ protected final static int isroCmd = ByteBuffer.wrap("isro".getBytes())
+ .getInt();
+
protected final static HashMap<Integer, String> cmd2String =
new HashMap<Integer, String>();
@@ -229,6 +243,7 @@ public abstract class ServerCnxn impleme
cmd2String.put(wchpCmd, "wchp");
cmd2String.put(wchsCmd, "wchs");
cmd2String.put(mntrCmd, "mntr");
+ cmd2String.put(isroCmd, "isro");
}
protected void packetReceived() {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu May 19 23:17:48 2011
@@ -56,6 +56,7 @@ import org.apache.zookeeper.server.Sessi
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
/**
* This class implements a simple standalone ZooKeeperServer. It sets up the
@@ -139,11 +140,10 @@ public class ZooKeeperServer implements
* actually start listening for clients until run() is invoked.
*
* @param dataDir the directory to put the data
- * @throws IOException
*/
public ZooKeeperServer(FileTxnSnapLog txnLogFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
- DataTreeBuilder treeBuilder, ZKDatabase zkDb) throws IOException {
+ DataTreeBuilder treeBuilder, ZKDatabase zkDb) {
serverStats = new ServerStats(this);
this.txnLogFactory = txnLogFactory;
this.zkDb = zkDb;
@@ -561,6 +561,10 @@ public class ZooKeeperServer implements
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
bos.writeInt(-1, "len");
rsp.serialize(bos, "connect");
+ if (!cnxn.isOldClient) {
+ bos.writeBool(
+ this instanceof ReadOnlyZooKeeperServer, "readOnly");
+ }
baos.close();
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
bb.putInt(bb.remaining() - 4).rewind();
@@ -762,6 +766,23 @@ public class ZooKeeperServer implements
+ " client's lastZxid is 0x"
+ Long.toHexString(connReq.getLastZxidSeen()));
}
+ boolean readOnly = false;
+ try {
+ readOnly = bia.readBool("readOnly");
+ cnxn.isOldClient = false;
+ } catch (IOException e) {
+ // this is ok -- just a packet from an old client which
+ // doesn't contain readOnly field
+ LOG.warn("Connection request from old client "
+ + cnxn.getRemoteSocketAddress()
+ + "; will be dropped if server is in r-o mode");
+ }
+ if (readOnly == false && this instanceof ReadOnlyZooKeeperServer) {
+ String msg = "Refusing session request for not-read-only client "
+ + cnxn.getRemoteSocketAddress();
+ LOG.info(msg);
+ throw new CloseRequestException(msg);
+ }
if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
String msg = "Refusing session request for client "
+ cnxn.getRemoteSocketAddress()
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu May 19 23:17:48 2011
@@ -618,12 +618,41 @@ public class QuorumPeer extends Thread i
while (running) {
switch (getPeerState()) {
case LOOKING:
+ LOG.info("LOOKING");
+
+ // Create read-only server but don't start it immediately
+ final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
+ logFactory, this,
+ new ZooKeeperServer.BasicDataTreeBuilder(),
+ this.zkDb);
try {
- LOG.info("LOOKING");
+ // Instead of starting roZk immediately, wait some grace
+ // period before we decide we're partitioned.
+ //
+ // Thread is used here because otherwise it would require
+ // changes in each of election strategy classes which is
+ // unnecessary code coupling.
+ new Thread() {
+ public void run() {
+ try {
+ // lower-bound grace period to 2 secs
+ sleep(Math.max(2000, tickTime));
+ if (ServerState.LOOKING.equals(getPeerState())) {
+ roZk.startup();
+ LOG.info("Read-only server started");
+ }
+ } catch (Exception e) {
+ LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
+ }
+ }
+ }.start();
+
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception",e);
setPeerState(ServerState.LOOKING);
+ } finally {
+ roZk.shutdown();
}
break;
case OBSERVING:
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Thu May 19 23:17:48 2011
@@ -17,7 +17,6 @@
*/
package org.apache.zookeeper.server.quorum;
-import java.io.IOException;
import java.io.PrintWriter;
import org.apache.zookeeper.server.ZKDatabase;
@@ -34,7 +33,6 @@ public abstract class QuorumZooKeeperSer
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
DataTreeBuilder treeBuilder, ZKDatabase zkDb, QuorumPeer self)
- throws IOException
{
super(logFactory, tickTime, minSessionTimeout, maxSessionTimeout,
treeBuilder, zkDb);
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu May 19 23:17:48 2011
@@ -100,7 +100,8 @@ public abstract class ClientBase extends
connected = false;
}
synchronized public void process(WatchedEvent event) {
- if (event.getState() == KeeperState.SyncConnected) {
+ if (event.getState() == KeeperState.SyncConnected ||
+ event.getState() == KeeperState.ConnectedReadOnly) {
connected = true;
notifyAll();
clientConnected.countDown();
@@ -257,7 +258,8 @@ public abstract class ClientBase extends
// if there are multiple hostports, just take the first one
HostPort hpobj = parseHostPortList(hp).get(0);
String result = send4LetterWord(hpobj.host, hpobj.port, "stat");
- if (result.startsWith("Zookeeper version:")) {
+ if (result.startsWith("Zookeeper version:") &&
+ !result.contains("READ-ONLY")) {
return true;
}
} catch (IOException e) {
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=1125176&r1=1125175&r2=1125176&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Thu May 19 23:17:48 2011
@@ -45,17 +45,17 @@ public class QuorumBase extends ClientBa
File s1dir, s2dir, s3dir, s4dir, s5dir;
QuorumPeer s1, s2, s3, s4, s5;
- private int port1;
- private int port2;
- private int port3;
- private int port4;
- private int port5;
+ protected int port1;
+ protected int port2;
+ protected int port3;
+ protected int port4;
+ protected int port5;
- private int portLE1;
- private int portLE2;
- private int portLE3;
- private int portLE4;
- private int portLE5;
+ protected int portLE1;
+ protected int portLE2;
+ protected int portLE3;
+ protected int portLE4;
+ protected int portLE5;
@Test
// This just avoids complaints by junit