You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/09 22:21:56 UTC
svn commit: r1530781 [1/2] - in /zookeeper/trunk: ./ src/c/include/
src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/o...
Author: thawan
Date: Wed Oct 9 20:21:55 2013
New Revision: 1530781
URL: http://svn.apache.org/r1530781
Log:
ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)
Added:
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java (with props)
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java (with props)
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java (with props)
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java (with props)
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java (with props)
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/c/include/zookeeper.h
zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Oct 9 20:21:55 2013
@@ -16,6 +16,8 @@ NEW FEATURES:
ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
(Marshall McMullen via michim)
+ ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)
+
BUGFIXES:
ZOOKEEPER-786. Exception in ZooKeeper.toString
Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Wed Oct 9 20:21:55 2013
@@ -116,8 +116,9 @@ enum ZOO_ERRORS {
ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
ZNEWCONFIGNOQUORUM = -120, /*!< No quorum of new config is connected and up-to-date with the leader of last commmitted config - try
invoking reconfiguration after new servers are connected and synced */
- ZRECONFIGINPROGRESS = -121 /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
+ ZRECONFIGINPROGRESS = -121, /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
not supported. Please retry. */
+ ZEPHEMERALONLOCALSESSION = -122 /*!< Attempt to create ephemeral node on a local session */
};
#ifdef __cplusplus
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java Wed Oct 9 20:21:55 2013
@@ -135,7 +135,8 @@ public abstract class KeeperException ex
return new SessionMovedException();
case NOTREADONLY:
return new NotReadOnlyException();
-
+ case EPHEMERALONLOCALSESSION:
+ return new EphemeralOnLocalSessionException();
case OK:
default:
throw new IllegalArgumentException("Invalid exception code");
@@ -222,6 +223,9 @@ public abstract class KeeperException ex
@Deprecated
public static final int BadArguments = -8;
+ @Deprecated
+ public static final int UnknownSession= -12;
+
/**
* @deprecated deprecated in 3.1.0, use {@link Code#APIERROR} instead
*/
@@ -291,6 +295,9 @@ public abstract class KeeperException ex
@Deprecated
public static final int ReconfigInProgress= -121;
+ @Deprecated
+ public static final int EphemeralOnLocalSession = -122;
+
}
/** Codes which represent the various KeeperException
@@ -328,6 +335,8 @@ public abstract class KeeperException ex
NEWCONFIGNOQUORUM (NewConfigNoQuorum),
/** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
RECONFIGINPROGRESS (ReconfigInProgress),
+ /** Unknown session (internal server use only) */
+ UNKNOWNSESSION (UnknownSession),
/** API errors.
* This is never thrown by the server, it shouldn't be used other than
@@ -361,7 +370,9 @@ public abstract class KeeperException ex
/** Session moved to another server, so operation is ignored */
SESSIONMOVED (-118),
/** State-changing request is passed to read-only server */
- NOTREADONLY (-119);
+ NOTREADONLY (-119),
+ /** Attempt to create ephemeral node on a local session */
+ EPHEMERALONLOCALSESSION (EphemeralOnLocalSession);
private static final Map<Integer,Code> lookup
= new HashMap<Integer,Code>();
@@ -442,6 +453,8 @@ public abstract class KeeperException ex
return "Session moved";
case NOTREADONLY:
return "Not a read-only call";
+ case EPHEMERALONLOCALSESSION:
+ return "Ephemeral node on local session";
default:
return "Unknown error " + code;
}
@@ -502,7 +515,7 @@ public abstract class KeeperException ex
* If this exception was thrown by a multi-request then the (partial) results
* and error codes can be retrieved using this getter.
* @return A copy of the list of results from the operations in the multi-request.
- *
+ *
* @since 3.4.0
*
*/
@@ -701,7 +714,16 @@ public abstract class KeeperException ex
super(Code.SESSIONEXPIRED);
}
}
-
+
+ /**
+ * @see Code#UNKNOWNSESSION
+ */
+ public static class UnknownSessionException extends KeeperException {
+ public UnknownSessionException() {
+ super(Code.UNKNOWNSESSION);
+ }
+ }
+
/**
* @see Code#SESSIONMOVED
*/
@@ -721,6 +743,15 @@ public abstract class KeeperException ex
}
/**
+ * @see Code#EPHEMERALONLOCALSESSION
+ */
+ public static class EphemeralOnLocalSessionException extends KeeperException {
+ public EphemeralOnLocalSessionException() {
+ super(Code.EPHEMERALONLOCALSESSION);
+ }
+ }
+
+ /**
* @see Code#SYSTEMERROR
*/
public static class SystemErrorException extends KeeperException {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java Wed Oct 9 20:21:55 2013
@@ -32,16 +32,16 @@ public class CreateCommand extends CliCo
private static Options options = new Options();
private String[] args;
private CommandLine cl;
-
+
{
options.addOption(new Option("e", false, "ephemeral"));
options.addOption(new Option("s", false, "sequential"));
}
-
+
public CreateCommand() {
super("create", "[-s] [-e] path [data] [acl]");
}
-
+
@Override
public CliCommand parse(String[] cmdArgs) throws ParseException {
@@ -54,7 +54,7 @@ public class CreateCommand extends CliCo
return this;
}
-
+
@Override
public boolean exec() throws KeeperException, InterruptedException {
CreateMode flags = CreateMode.PERSISTENT;
@@ -74,8 +74,13 @@ public class CreateCommand extends CliCo
if (args.length > 3) {
acl = AclParser.parse(args[3]);
}
- String newPath = zk.create(path, data, acl, flags);
- err.println("Created " + newPath);
+ try {
+ String newPath = zk.create(path, data, acl, flags);
+ err.println("Created " + newPath);
+ } catch(KeeperException.EphemeralOnLocalSessionException e) {
+ err.println("Unable to create ephemeral node on a local session");
+ return false;
+ }
return true;
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -96,6 +96,11 @@ public class FinalRequestProcessor imple
}
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) {
+ // Need to process local session requests
+ rc = zks.processTxn(request);
+
+ // request.hdr is set for write requests, which are the only ones
+ // that add to outstandingChanges.
if (request.getHdr() != null) {
TxnHeader hdr = request.getHdr();
Record txn = request.getTxn();
@@ -111,16 +116,15 @@ public class FinalRequestProcessor imple
zks.outstandingChangesForPath.remove(cr.path);
}
}
-
- rc = zks.processTxn(hdr, txn);
}
+
// do not add non quorum packets to the queue.
- if (Request.isQuorum(request.type)) {
+ if (request.isQuorum()) {
zks.getZKDatabase().addCommittedProposal(request);
}
}
- if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) {
+ if (request.type == OpCode.closeSession) {
ServerCnxnFactory scxn = zks.getServerCnxnFactory();
// this might be possible since
// we might just be playing diffs from the leader
@@ -145,8 +149,19 @@ public class FinalRequestProcessor imple
Record rsp = null;
try {
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
- throw KeeperException.create(KeeperException.Code.get((
- (ErrorTxn) request.getTxn()).getErr()));
+ /*
+ * When local session upgrading is disabled, leader will
+ * reject the ephemeral node creation due to session expire.
+ * However, if this is the follower that issue the request,
+ * it will have the correct error code, so we should use that
+ * and report to user
+ */
+ if (request.getException() != null) {
+ throw request.getException();
+ } else {
+ throw KeeperException.create(KeeperException.Code
+ .get(((ErrorTxn) request.getTxn()).getErr()));
+ }
}
KeeperException ke = request.getException();
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -349,11 +349,12 @@ public class PrepRequestProcessor extend
switch (type) {
case OpCode.create: {
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CreateRequest createRequest = (CreateRequest)record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
}
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+ validateCreateRequest(createMode, request);
String path = createRequest.getPath();
String parentPath = validatePathForCreate(path, request.sessionId);
@@ -365,7 +366,6 @@ public class PrepRequestProcessor extend
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
- CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
@@ -402,11 +402,12 @@ public class PrepRequestProcessor extend
break;
}
case OpCode.create2: {
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
Create2Request createRequest = (Create2Request)record;
if (deserialize) {
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
}
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+ validateCreateRequest(createMode, request);
String path = createRequest.getPath();
String parentPath = validatePathForCreate(path, request.sessionId);
@@ -418,7 +419,6 @@ public class PrepRequestProcessor extend
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
- CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
}
@@ -624,7 +624,13 @@ public class PrepRequestProcessor extend
int to = request.request.getInt();
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
- zks.sessionTracker.addSession(request.sessionId, to);
+ if (request.isLocalSession()) {
+ // This will add to local session tracker if it is enabled
+ zks.sessionTracker.addSession(request.sessionId, to);
+ } else {
+ // Explicitly add to global session if the flag is not set
+ zks.sessionTracker.addGlobalSession(request.sessionId, to);
+ }
zks.setOwner(request.sessionId, request.getOwner());
break;
case OpCode.closeSession:
@@ -791,7 +797,10 @@ public class PrepRequestProcessor extend
//create/close session don't require request record
case OpCode.createSession:
case OpCode.closeSession:
- pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+ if (!request.isLocalSession()) {
+ pRequest2Txn(request.type, zks.getNextZxid(), request,
+ null, true);
+ }
break;
//All the rest don't need to create a Txn - just verify session
@@ -855,7 +864,22 @@ public class PrepRequestProcessor extend
}
return retval;
}
-
+
+ private void validateCreateRequest(CreateMode createMode, Request request)
+ throws KeeperException {
+ if (createMode.isEphemeral()) {
+ // Exception is set when local session failed to upgrade
+ // so we just need to report the error
+ if (request.getException() != null) {
+ throw request.getException();
+ }
+ zks.sessionTracker.checkGlobalSession(request.sessionId,
+ request.getOwner());
+ } else {
+ zks.sessionTracker.checkSession(request.sessionId,
+ request.getOwner());
+ }
+ }
/**
* This method checks out the acl making sure it isn't null or empty,
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Wed Oct 9 20:21:55 2013
@@ -83,6 +83,19 @@ public class Request {
public QuorumVerifier qv = null;
+ /**
+ * If this is a create or close request for a local-only session.
+ */
+ private boolean isLocalSession = false;
+
+ public boolean isLocalSession() {
+ return isLocalSession;
+ }
+
+ public void setLocalSession(boolean isLocalSession) {
+ this.isLocalSession = isLocalSession;
+ }
+
public Object getOwner() {
return owner;
}
@@ -119,43 +132,41 @@ public class Request {
switch (type) {
case OpCode.notification:
return false;
+ case OpCode.check:
+ case OpCode.closeSession:
case OpCode.create:
case OpCode.create2:
- case OpCode.delete:
case OpCode.createSession:
+ case OpCode.delete:
case OpCode.exists:
- case OpCode.getData:
- case OpCode.check:
- case OpCode.multi:
- case OpCode.setData:
- case OpCode.sync:
case OpCode.getACL:
- case OpCode.setACL:
case OpCode.getChildren:
case OpCode.getChildren2:
+ case OpCode.getData:
+ case OpCode.multi:
case OpCode.ping:
- case OpCode.closeSession:
- case OpCode.setWatches:
case OpCode.reconfig:
+ case OpCode.setACL:
+ case OpCode.setData:
+ case OpCode.setWatches:
+ case OpCode.sync:
return true;
default:
return false;
}
}
- static boolean isQuorum(int type) {
- switch (type) {
+ public boolean isQuorum() {
+ switch (this.type) {
case OpCode.exists:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.getChildren2:
case OpCode.getData:
return false;
- case OpCode.error:
- case OpCode.closeSession:
case OpCode.create:
case OpCode.create2:
- case OpCode.createSession:
+ case OpCode.error:
case OpCode.delete:
case OpCode.setACL:
case OpCode.setData:
@@ -163,6 +174,9 @@ public class Request {
case OpCode.multi:
case OpCode.reconfig:
return true;
+ case OpCode.closeSession:
+ case OpCode.createSession:
+ return !this.isLocalSession;
default:
return false;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java Wed Oct 9 20:21:55 2013
@@ -44,7 +44,22 @@ public interface SessionTracker {
long createSession(int sessionTimeout);
- void addSession(long id, int to);
+ /**
+ * Add a global session to those being tracked.
+ * @param id sessionId
+ * @param to sessionTimeout
+ * @return whether the session was newly added (if false, already existed)
+ */
+ boolean addGlobalSession(long id, int to);
+
+ /**
+ * Add a session to those being tracked. The session is added as a local
+ * session if they are enabled, otherwise as global.
+ * @param id sessionId
+ * @param to sessionTimeout
+ * @return whether the session was newly added (if false, already existed)
+ */
+ boolean addSession(long id, int to);
/**
* @param sessionId
@@ -60,7 +75,7 @@ public interface SessionTracker {
void setSessionClosing(long sessionId);
/**
- *
+ *
*/
void shutdown();
@@ -69,7 +84,39 @@ public interface SessionTracker {
*/
void removeSession(long sessionId);
- void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException;
+ /**
+ * @param sessionId
+ * @return whether or not the SessionTracker is aware of this session
+ */
+ boolean isTrackingSession(long sessionId);
+
+ /**
+ * Checks whether the SessionTracker is aware of this session, the session
+ * is still active, and the owner matches. If the owner wasn't previously
+ * set, this sets the owner of the session.
+ *
+ * UnknownSessionException should never been thrown to the client. It is
+ * only used internally to deal with possible local session from other
+ * machine
+ *
+ * @param sessionId
+ * @param owner
+ */
+ public void checkSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
+ KeeperException.SessionMovedException,
+ KeeperException.UnknownSessionException;
+
+ /**
+ * Strictly check that a given session is a global session or not
+ * @param sessionId
+ * @param owner
+ * @throws KeeperException.SessionExpiredException
+ * @throws KeeperException.SessionMovedException
+ */
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
+ KeeperException.SessionMovedException;
void setOwner(long id, Object owner) throws SessionExpiredException;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java Wed Oct 9 20:21:55 2013
@@ -20,18 +20,15 @@ package org.apache.zookeeper.server;
import java.io.PrintWriter;
import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This is a full featured SessionTracker. It tracks session in grouped by tick
@@ -47,8 +44,7 @@ public class SessionTrackerImpl extends
private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
- private final ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
- private final long serverId;
+ private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
private final AtomicLong nextSessionId = new AtomicLong();
public static class SessionImpl implements Session {
@@ -73,6 +69,10 @@ public class SessionTrackerImpl extends
}
}
+ /**
+ * Generates an initial sessionId. High order byte is serverId, next 5
+ * 5 bytes are from timestamp, and low order 2 bytes are 0s.
+ */
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (System.currentTimeMillis() << 24) >> 8;
@@ -83,15 +83,14 @@ public class SessionTrackerImpl extends
private final SessionExpirer expirer;
public SessionTrackerImpl(SessionExpirer expirer,
- ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
- long sid)
+ ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
+ long serverId)
{
super("SessionTracker");
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
- this.serverId = sid;
- this.nextSessionId.set(initializeNextSession(sid));
+ this.nextSessionId.set(initializeNextSession(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
addSession(e.getKey(), e.getValue());
}
@@ -103,7 +102,7 @@ public class SessionTrackerImpl extends
pwriter.print("Session ");
sessionExpiryQueue.dump(pwriter);
}
-
+
@Override
public String toString() {
StringWriter sw = new StringWriter();
@@ -151,6 +150,10 @@ public class SessionTrackerImpl extends
return true;
}
+ public int getSessionTimeout(long sessionId) {
+ return sessionsWithTimeout.get(sessionId);
+ }
+
synchronized public void setSessionClosing(long sessionId) {
if (LOG.isTraceEnabled()) {
LOG.trace("Session closing: 0x" + Long.toHexString(sessionId));
@@ -192,31 +195,49 @@ public class SessionTrackerImpl extends
return sessionId;
}
- synchronized public void addSession(long id, int sessionTimeout) {
+ public boolean addGlobalSession(long id, int sessionTimeout) {
+ return addSession(id, sessionTimeout);
+ }
+
+ public synchronized boolean addSession(long id, int sessionTimeout) {
+ boolean added = false;
+
sessionsWithTimeout.put(id, sessionTimeout);
if (sessionsById.get(id) == null) {
SessionImpl s = new SessionImpl(id, sessionTimeout);
sessionsById.put(id, s);
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "SessionTrackerImpl --- Adding session 0x"
- + Long.toHexString(id) + " " + sessionTimeout);
- }
- } else {
- if (LOG.isTraceEnabled()) {
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
- "SessionTrackerImpl --- Existing session 0x"
- + Long.toHexString(id) + " " + sessionTimeout);
- }
+ added = true;
+ LOG.debug("Adding session 0x" + Long.toHexString(id));
+ }
+ if (LOG.isTraceEnabled()) {
+ String actionStr = added ? "Adding" : "Existing";
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+ "SessionTrackerImpl --- " + actionStr + " session 0x"
+ + Long.toHexString(id) + " " + sessionTimeout);
}
touchSession(id, sessionTimeout);
+ return added;
+ }
+
+ public boolean isTrackingSession(long sessionId) {
+ return sessionsById.containsKey(sessionId);
}
- synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
+ public synchronized void checkSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
+ KeeperException.SessionMovedException,
+ KeeperException.UnknownSessionException {
+ LOG.debug("Checking session 0x" + Long.toHexString(sessionId));
SessionImpl session = sessionsById.get(sessionId);
- if (session == null || session.isClosing()) {
+
+ if (session == null) {
+ throw new KeeperException.UnknownSessionException();
+ }
+
+ if (session.isClosing()) {
throw new KeeperException.SessionExpiredException();
}
+
if (session.owner == null) {
session.owner = owner;
} else if (session.owner != owner) {
@@ -231,4 +252,14 @@ public class SessionTrackerImpl extends
}
session.owner = owner;
}
+
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
+ KeeperException.SessionMovedException {
+ try {
+ checkSession(sessionId, owner);
+ } catch (KeeperException.UnknownSessionException e) {
+ throw new KeeperException.SessionExpiredException();
+ }
+ }
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Wed Oct 9 20:21:55 2013
@@ -29,7 +29,7 @@ import org.apache.zookeeper.ZooDefs.OpCo
public class TraceFormatter {
- static String op2String(int op) {
+ public static String op2String(int op) {
switch (op) {
case OpCode.notification:
return "notification";
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Oct 9 20:21:55 2013
@@ -287,6 +287,10 @@ public class ZooKeeperServer implements
return hzxid.get();
}
+ public SessionTracker getSessionTracker() {
+ return sessionTracker;
+ }
+
long getNextZxid() {
return hzxid.incrementAndGet();
}
@@ -300,7 +304,9 @@ public class ZooKeeperServer implements
}
private void close(long sessionId) {
- submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
+ Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
+ setLocalSessionFlag(si);
+ submitRequest(si);
}
public void closeSession(long sessionId) {
@@ -409,7 +415,7 @@ public class ZooKeeperServer implements
sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
tickTime, 1);
}
-
+
protected void startSessionTracker() {
((SessionTrackerImpl)sessionTracker).start();
}
@@ -518,13 +524,19 @@ public class ZooKeeperServer implements
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
+ if (passwd == null) {
+ // Possible since it's just deserialized from a packet on the wire.
+ passwd = new byte[0];
+ }
long sessionId = sessionTracker.createSession(timeout);
Random r = new Random(sessionId ^ superSecret);
r.nextBytes(passwd);
ByteBuffer to = ByteBuffer.allocate(4);
to.putInt(timeout);
cnxn.setSessionId(sessionId);
- submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
+ Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+ setLocalSessionFlag(si);
+ submitRequest(si);
return sessionId;
}
@@ -554,6 +566,8 @@ public class ZooKeeperServer implements
if (checkPasswd(sessionId, passwd)) {
revalidateSession(cnxn, sessionId, sessionTimeout);
} else {
+ LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+ + " for session 0x" + Long.toHexString(sessionId));
finishSessionInit(cnxn, false);
}
}
@@ -618,15 +632,13 @@ public class ZooKeeperServer implements
}
/**
- * @param cnxn
- * @param sessionId
- * @param xid
- * @param bb
+ * If the underlying Zookeeper server support local session, this method
+ * will set a isLocalSession to true if a request is associated with
+ * a local session.
+ *
+ * @param si
*/
- private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
- int xid, ByteBuffer bb, List<Id> authInfo) {
- Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
- submitRequest(si);
+ protected void setLocalSessionFlag(Request si) {
}
public void submitRequest(Request si) {
@@ -919,6 +931,9 @@ public class ZooKeeperServer implements
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
+ // Always treat packet from the client as a possible
+ // local request.
+ setLocalSessionFlag(si);
submitRequest(si);
}
}
@@ -966,17 +981,36 @@ public class ZooKeeperServer implements
// wrap SASL response token to client inside a Response object.
return new SetSASLResponse(responseToken);
}
-
+
+ // entry point for quorum/Learner.java
public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+ return processTxn(null, hdr, txn);
+ }
+
+ // entry point for FinalRequestProcessor.java
+ public ProcessTxnResult processTxn(Request request) {
+ return processTxn(request, request.getHdr(), request.getTxn());
+ }
+
+ private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
+ Record txn) {
ProcessTxnResult rc;
- int opCode = hdr.getType();
- long sessionId = hdr.getClientId();
- rc = getZKDatabase().processTxn(hdr, txn);
+ int opCode = request != null ? request.type : hdr.getType();
+ long sessionId = request != null ? request.sessionId : hdr.getClientId();
+ if (hdr != null) {
+ rc = getZKDatabase().processTxn(hdr, txn);
+ } else {
+ rc = new ProcessTxnResult();
+ }
if (opCode == OpCode.createSession) {
- if (txn instanceof CreateSessionTxn) {
+ if (hdr != null && txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) txn;
- sessionTracker.addSession(sessionId, cst
- .getTimeOut());
+ sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
+ } else if (request != null && request.isLocalSession()) {
+ request.request.rewind();
+ int timeout = request.request.getInt();
+ request.request.rewind();
+ sessionTracker.addSession(request.sessionId, timeout);
} else {
LOG.warn("*****>>>>> Got "
+ txn.getClass() + " "
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Wed Oct 9 20:21:55 2013
@@ -134,11 +134,12 @@ public class CommitProcessor extends Thr
case OpCode.reconfig:
case OpCode.multi:
case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
return true;
case OpCode.sync:
- return matchSyncs;
+ return matchSyncs;
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ return !request.isLocalSession();
default:
return false;
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -18,15 +18,17 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This RequestProcessor forwards any requests that modify the state of the
@@ -83,12 +85,17 @@ public class FollowerRequestProcessor ex
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
case OpCode.multi:
case OpCode.check:
zks.getFollower().request(request);
break;
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ // Don't forward local sessions to the leader.
+ if (!request.isLocalSession()) {
+ zks.getFollower().request(request);
+ }
+ break;
}
}
} catch (Exception e) {
@@ -99,6 +106,25 @@ public class FollowerRequestProcessor ex
public void processRequest(Request request) {
if (!finished) {
+ // Before sending the request, check if the request requires a
+ // global session and what we have is a local session. If so do
+ // an upgrade.
+ Request upgradeRequest = null;
+ try {
+ upgradeRequest = zks.checkUpgradeSession(request);
+ } catch (KeeperException ke) {
+ if (request.getHdr() != null) {
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(ke.code().intValue()));
+ }
+ request.setException(ke);
+ LOG.info("Error creating upgrade request", ke);
+ } catch (IOException ie) {
+ LOG.error("Unexpected error in upgrade", ie);
+ }
+ if (upgradeRequest != null) {
+ queuedRequests.add(upgradeRequest);
+ }
queuedRequests.add(request);
}
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for performing local session upgrade. Only request submitted
+ * directly to the leader should go through this processor.
+ */
+public class LeaderRequestProcessor implements RequestProcessor {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(LeaderRequestProcessor.class);
+
+ private final LeaderZooKeeperServer lzks;
+
+ private final RequestProcessor nextProcessor;
+
+ public LeaderRequestProcessor(LeaderZooKeeperServer zks,
+ RequestProcessor nextProcessor) {
+ this.lzks = zks;
+ this.nextProcessor = nextProcessor;
+ }
+
+ @Override
+ public void processRequest(Request request)
+ throws RequestProcessorException {
+ // Check if this is a local session and we are trying to create
+ // an ephemeral node, in which case we upgrade the session
+ Request upgradeRequest = null;
+ try {
+ upgradeRequest = lzks.checkUpgradeSession(request);
+ } catch (KeeperException ke) {
+ if (request.getHdr() != null) {
+ LOG.debug("Updating header");
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(ke.code().intValue()));
+ }
+ request.setException(ke);
+ LOG.info("Error creating upgrade request " + ke.getMessage());
+ } catch (IOException ie) {
+ LOG.error("Unexpected error in upgrade", ie);
+ }
+ if (upgradeRequest != null) {
+ nextProcessor.processRequest(upgradeRequest);
+ }
+
+ nextProcessor.processRequest(request);
+ }
+
+ @Override
+ public void shutdown() {
+ LOG.info("Shutting down");
+ nextProcessor.shutdown();
+ }
+
+}
Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
+import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The leader session tracker tracks local and global sessions on the leader.
+ */
+public class LeaderSessionTracker extends UpgradeableSessionTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
+
+ private final boolean localSessionsEnabled;
+ private final SessionTrackerImpl globalSessionTracker;
+
+ /**
+ * Server id of the leader
+ */
+ private final long serverId;
+
+ public LeaderSessionTracker(SessionExpirer expirer,
+ ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+ int tickTime, long id, boolean localSessionsEnabled) {
+
+ this.globalSessionTracker = new SessionTrackerImpl(
+ expirer, sessionsWithTimeouts, tickTime, id);
+
+ this.localSessionsEnabled = localSessionsEnabled;
+ if (this.localSessionsEnabled) {
+ createLocalSessionTracker(expirer, tickTime, id);
+ }
+ serverId = id;
+ }
+
+ public void removeSession(long sessionId) {
+ if (localSessionTracker != null) {
+ localSessionTracker.removeSession(sessionId);
+ }
+ globalSessionTracker.removeSession(sessionId);
+ }
+
+ public void start() {
+ globalSessionTracker.start();
+ if (localSessionTracker != null) {
+ localSessionTracker.start();
+ }
+ }
+
+ public void shutdown() {
+ if (localSessionTracker != null) {
+ localSessionTracker.shutdown();
+ }
+ globalSessionTracker.shutdown();
+ }
+
+ public boolean isGlobalSession(long sessionId) {
+ return globalSessionTracker.isTrackingSession(sessionId);
+ }
+
+ public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+ boolean added =
+ globalSessionTracker.addSession(sessionId, sessionTimeout);
+ if (localSessionsEnabled && added) {
+ // Only do extra logging so we know what kind of session this is
+ // if we're supporting both kinds of sessions
+ LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+ }
+ return added;
+ }
+
+ public boolean addSession(long sessionId, int sessionTimeout) {
+ boolean added;
+ if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+ added = localSessionTracker.addSession(sessionId, sessionTimeout);
+ // Check for race condition with session upgrading
+ if (isGlobalSession(sessionId)) {
+ added = false;
+ localSessionTracker.removeSession(sessionId);
+ } else if (added) {
+ LOG.info("Adding local session 0x" + Long.toHexString(sessionId));
+ }
+ } else {
+ added = addGlobalSession(sessionId, sessionTimeout);
+ }
+ return added;
+ }
+
+ public boolean touchSession(long sessionId, int sessionTimeout) {
+ if (localSessionTracker != null &&
+ localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+ return true;
+ }
+ return globalSessionTracker.touchSession(sessionId, sessionTimeout);
+ }
+
+ public long createSession(int sessionTimeout) {
+ if (localSessionsEnabled) {
+ return localSessionTracker.createSession(sessionTimeout);
+ }
+ return globalSessionTracker.createSession(sessionTimeout);
+ }
+
+ // Returns the serverId from the sessionId (the high order byte)
+ public static long getServerIdFromSessionId(long sessionId) {
+ return sessionId >> 56;
+ }
+
+ public void checkSession(long sessionId, Object owner)
+ throws SessionExpiredException, SessionMovedException,
+ UnknownSessionException {
+ if (localSessionTracker != null) {
+ try {
+ localSessionTracker.checkSession(sessionId, owner);
+ // A session can both be a local and global session during
+ // upgrade
+ if (!isGlobalSession(sessionId)) {
+ return;
+ }
+ } catch(UnknownSessionException e) {
+ // Ignore. We'll check instead whether it's a global session
+ }
+ }
+ try {
+ globalSessionTracker.checkSession(sessionId, owner);
+ // if we can get here, it is a valid global session
+ return;
+ } catch (UnknownSessionException e) {
+ // Ignore. This may be local session from other servers.
+ }
+
+ /*
+ * if local session is not enabled or it used to be our local session
+ * throw sessions expires
+ */
+ if (!localSessionsEnabled
+ || (getServerIdFromSessionId(sessionId) == serverId)) {
+ throw new SessionExpiredException();
+ }
+ }
+
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws SessionExpiredException, SessionMovedException {
+ try {
+ globalSessionTracker.checkSession(sessionId, owner);
+ } catch (UnknownSessionException e) {
+ // For global session, if we don't know it, it is already expired
+ throw new SessionExpiredException();
+ }
+ }
+
+ public void setOwner(long sessionId, Object owner)
+ throws SessionExpiredException {
+ if (localSessionTracker != null) {
+ try {
+ localSessionTracker.setOwner(sessionId, owner);
+ return;
+ } catch(SessionExpiredException e) {
+ // Ignore. We'll check instead whether it's a global session
+ }
+ }
+ globalSessionTracker.setOwner(sessionId, owner);
+ }
+
+ public void dumpSessions(PrintWriter pwriter) {
+ if (localSessionTracker != null) {
+ pwriter.print("Local ");
+ localSessionTracker.dumpSessions(pwriter);
+ pwriter.print("Global ");
+ }
+ globalSessionTracker.dumpSessions(pwriter);
+ }
+
+ public void setSessionClosing(long sessionId) {
+ // call is no-op if session isn't tracked so safe to call both
+ if (localSessionTracker != null) {
+ localSessionTracker.setSessionClosing(sessionId);
+ }
+ globalSessionTracker.setSessionClosing(sessionId);
+ }
+}
Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Wed Oct 9 20:21:55 2013
@@ -25,9 +25,9 @@ import org.apache.zookeeper.jmx.MBeanReg
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.FinalRequestProcessor;
import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.SessionTrackerImpl;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -39,8 +39,11 @@ import org.apache.zookeeper.server.persi
* FinalRequestProcessor
*/
public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
+
CommitProcessor commitProcessor;
+ PrepRequestProcessor prepRequestProcessor;
+
/**
* @param port
* @param dataDir
@@ -64,8 +67,9 @@ public class LeaderZooKeeperServer exten
ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor);
proposalProcessor.initialize();
- firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
- ((PrepRequestProcessor)firstProcessor).start();
+ prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+ prepRequestProcessor.start();
+ firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
}
@Override
@@ -75,20 +79,45 @@ public class LeaderZooKeeperServer exten
@Override
public void createSessionTracker() {
- sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
- tickTime, self.getId());
- }
-
- @Override
- protected void startSessionTracker() {
- ((SessionTrackerImpl)sessionTracker).start();
+ sessionTracker = new LeaderSessionTracker(
+ this, getZKDatabase().getSessionWithTimeOuts(),
+ tickTime, self.getId(), self.areLocalSessionsEnabled());
}
-
public boolean touch(long sess, int to) {
return sessionTracker.touchSession(sess, to);
}
+ public boolean checkIfValidGlobalSession(long sess, int to) {
+ if (self.areLocalSessionsEnabled() &&
+ !upgradeableSessionTracker.isGlobalSession(sess)) {
+ return false;
+ }
+ return sessionTracker.touchSession(sess, to);
+ }
+
+ /**
+ * Requests coming from the learner should go directly to
+ * PrepRequestProcessor
+ *
+ * @param request
+ */
+ public void submitLearnerRequest(Request request) {
+ /*
+ * Requests coming from the learner should have gone through
+ * submitRequest() on each server which already perform some request
+ * validation, so we don't need to do it again.
+ *
+ * Additionally, LearnerHandler should start submitting requests into
+ * the leader's pipeline only when the leader's server is started, so we
+ * can submit the request directly into PrepRequestProcessor.
+ *
+ * This is done so that requests from learners won't go through
+ * LeaderRequestProcessor which perform local session upgrade.
+ */
+ prepRequestProcessor.processRequest(request);
+ }
+
@Override
protected void registerJMX() {
// register with JMX
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Oct 9 20:21:55 2013
@@ -30,6 +30,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
@@ -536,8 +537,7 @@ public class Learner {
DataInputStream dis = new DataInputStream(bis);
long sessionId = dis.readLong();
boolean valid = dis.readBoolean();
- ServerCnxn cnxn = pendingRevalidations
- .remove(sessionId);
+ ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
if (cnxn == null) {
LOG.warn("Missing session 0x"
+ Long.toHexString(sessionId)
@@ -557,8 +557,7 @@ public class Learner {
// Send back the ping with our session data
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
- HashMap<Long, Integer> touchTable = zk
- .getTouchSnapshot();
+ Map<Long, Integer> touchTable = zk.getTouchSnapshot();
for (Entry<Long, Integer> entry : touchTable.entrySet()) {
dos.writeLong(entry.getKey());
dos.writeInt(entry.getValue());
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Wed Oct 9 20:21:55 2013
@@ -525,7 +525,7 @@ public class LearnerHandler extends Thre
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutputStream dos = new DataOutputStream(bos);
dos.writeLong(id);
- boolean valid = leader.zk.touch(id, to);
+ boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
if (valid) {
try {
//set the session owner
@@ -559,7 +559,7 @@ public class LearnerHandler extends Thre
si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
}
si.setOwner(this);
- leader.zk.submitRequest(si);
+ leader.zk.submitLearnerRequest(si);
break;
default:
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java Wed Oct 9 20:21:55 2013
@@ -15,82 +15,202 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.zookeeper.server.quorum;
import java.io.PrintWriter;
-import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.zookeeper.server.SessionTracker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * This is really just a shell of a SessionTracker that tracks session activity
- * to be forwarded to the Leader using a PING.
+ * The learner session tracker is used by learners (followers and observers) to
+ * track zookeeper sessions which may or may not be echoed to the leader. When
+ * a new session is created it is saved locally in a wrapped
+ * LocalSessionTracker. It can subsequently be upgraded to a global session
+ * as required. If an upgrade is requested the session is removed from local
+ * collections while keeping the same session ID. It is up to the caller to
+ * queue a session creation request for the leader.
+ * A secondary function of the learner session tracker is to remember sessions
+ * which have been touched in this service. This information is passed along
+ * to the leader with a ping.
*/
-public class LearnerSessionTracker implements SessionTracker {
- SessionExpirer expirer;
+public class LearnerSessionTracker extends UpgradeableSessionTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(LearnerSessionTracker.class);
+
+ private final SessionExpirer expirer;
+ // Touch table for the global sessions
+ private final AtomicReference<Map<Long, Integer>> touchTable =
+ new AtomicReference<Map<Long, Integer>>();
+ private final long serverId;
+ private final AtomicLong nextSessionId = new AtomicLong();
- HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
- long serverId = 1;
- long nextSessionId=0;
-
- private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+ private final boolean localSessionsEnabled;
+ private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
public LearnerSessionTracker(SessionExpirer expirer,
- ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, long id) {
+ ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+ int tickTime, long id, boolean localSessionsEnabled) {
this.expirer = expirer;
- this.sessionsWithTimeouts = sessionsWithTimeouts;
+ this.touchTable.set(new ConcurrentHashMap<Long, Integer>());
+ this.globalSessionsWithTimeouts = sessionsWithTimeouts;
this.serverId = id;
- nextSessionId = SessionTrackerImpl.initializeNextSession(this.serverId);
-
- }
+ nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId));
- synchronized public void removeSession(long sessionId) {
- sessionsWithTimeouts.remove(sessionId);
- touchTable.remove(sessionId);
+ this.localSessionsEnabled = localSessionsEnabled;
+ if (this.localSessionsEnabled) {
+ createLocalSessionTracker(expirer, tickTime, id);
+ }
+ }
+
+ public void removeSession(long sessionId) {
+ if (localSessionTracker != null) {
+ localSessionTracker.removeSession(sessionId);
+ }
+ globalSessionsWithTimeouts.remove(sessionId);
+ touchTable.get().remove(sessionId);
+ }
+
+ public void start() {
+ if (localSessionTracker != null) {
+ localSessionTracker.start();
+ }
}
public void shutdown() {
- }
-
- synchronized public void addSession(long sessionId, int sessionTimeout) {
- sessionsWithTimeouts.put(sessionId, sessionTimeout);
- touchTable.put(sessionId, sessionTimeout);
- }
-
- synchronized public boolean touchSession(long sessionId, int sessionTimeout) {
- touchTable.put(sessionId, sessionTimeout);
+ if (localSessionTracker != null) {
+ localSessionTracker.shutdown();
+ }
+ }
+
+ public boolean isGlobalSession(long sessionId) {
+ return globalSessionsWithTimeouts.containsKey(sessionId);
+ }
+
+ public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+ boolean added =
+ globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null;
+ if (localSessionsEnabled && added) {
+ // Only do extra logging so we know what kind of session this is
+ // if we're supporting both kinds of sessions
+ LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+ }
+ touchTable.get().put(sessionId, sessionTimeout);
+ return added;
+ }
+
+ public boolean addSession(long sessionId, int sessionTimeout) {
+ boolean added;
+ if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+ added = localSessionTracker.addSession(sessionId, sessionTimeout);
+ // Check for race condition with session upgrading
+ if (isGlobalSession(sessionId)) {
+ added = false;
+ localSessionTracker.removeSession(sessionId);
+ } else if (added) {
+ LOG.info("Adding local session 0x"
+ + Long.toHexString(sessionId));
+ }
+ } else {
+ added = addGlobalSession(sessionId, sessionTimeout);
+ }
+ return added;
+ }
+
+ public boolean touchSession(long sessionId, int sessionTimeout) {
+ if (localSessionsEnabled) {
+ if (localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+ return true;
+ }
+ if (!isGlobalSession(sessionId)) {
+ return false;
+ }
+ }
+ touchTable.get().put(sessionId, sessionTimeout);
return true;
}
- synchronized HashMap<Long, Integer> snapshot() {
- HashMap<Long, Integer> oldTouchTable = touchTable;
- touchTable = new HashMap<Long, Integer>();
- return oldTouchTable;
- }
-
-
- synchronized public long createSession(int sessionTimeout) {
- return (nextSessionId++);
+ public Map<Long, Integer> snapshot() {
+ return touchTable.getAndSet(new ConcurrentHashMap<Long, Integer>());
}
- public void checkSession(long sessionId, Object owner) {
- // Nothing to do here. Sessions are checked at the Leader
- }
-
- public void setOwner(long sessionId, Object owner) {
- // Nothing to do here. Sessions are checked at the Leader
+ public long createSession(int sessionTimeout) {
+ if (localSessionsEnabled) {
+ return localSessionTracker.createSession(sessionTimeout);
+ }
+ return nextSessionId.getAndIncrement();
+ }
+
+ public void checkSession(long sessionId, Object owner)
+ throws SessionExpiredException, SessionMovedException {
+ if (localSessionTracker != null) {
+ try {
+ localSessionTracker.checkSession(sessionId, owner);
+ return;
+ } catch (UnknownSessionException e) {
+ // Check whether it's a global session. We can ignore those
+ // because they are handled at the leader, but if not, rethrow.
+ // We check local session status first to avoid race condition
+ // with session upgrading.
+ if (!isGlobalSession(sessionId)) {
+ throw new SessionExpiredException();
+ }
+ }
+ }
+ }
+
+ public void setOwner(long sessionId, Object owner)
+ throws SessionExpiredException {
+ if (localSessionTracker != null) {
+ try {
+ localSessionTracker.setOwner(sessionId, owner);
+ return;
+ } catch (SessionExpiredException e) {
+ // Check whether it's a global session. We can ignore those
+ // because they are handled at the leader, but if not, rethrow.
+ // We check local session status first to avoid race condition
+ // with session upgrading.
+ if (!isGlobalSession(sessionId)) {
+ throw e;
+ }
+ }
+ }
}
public void dumpSessions(PrintWriter pwriter) {
- // the original class didn't have tostring impl, so just
- // dup what we had before
- pwriter.println(toString());
+ if (localSessionTracker != null) {
+ pwriter.print("Local ");
+ localSessionTracker.dumpSessions(pwriter);
+ }
+ pwriter.print("Global Sessions(");
+ pwriter.print(globalSessionsWithTimeouts.size());
+ pwriter.println("):");
+ SortedSet<Long> sessionIds = new TreeSet<Long>(
+ globalSessionsWithTimeouts.keySet());
+ for (long sessionId : sessionIds) {
+ pwriter.print("0x");
+ pwriter.print(Long.toHexString(sessionId));
+ pwriter.print("\t");
+ pwriter.print(globalSessionsWithTimeouts.get(sessionId));
+ pwriter.println("ms");
+ }
}
public void setSessionClosing(long sessionId) {
- // Nothing to do here.
+ // Global sessions handled on the leader; this call is a no-op if
+ // not tracked as a local session so safe to call in both cases.
+ if (localSessionTracker != null) {
+ localSessionTracker.setSessionClosing(sessionId);
+ }
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Wed Oct 9 20:21:55 2013
@@ -18,19 +18,23 @@
package org.apache.zookeeper.server.quorum;
import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
import org.apache.zookeeper.server.ServerCnxn;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServerBean;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
/**
- * Parent class for all ZooKeeperServers for Learners
+ * Parent class for all ZooKeeperServers for Learners
*/
-public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
+public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
+
public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
ZKDatabase zkDb, QuorumPeer self)
@@ -42,47 +46,50 @@ public abstract class LearnerZooKeeperSe
/**
* Abstract method to return the learner associated with this server.
* Since the Learner may change under our feet (when QuorumPeer reassigns
- * it) we can't simply take a reference here. Instead, we need the
- * subclasses to implement this.
+ * it) we can't simply take a reference here. Instead, we need the
+ * subclasses to implement this.
*/
- abstract public Learner getLearner();
-
+ abstract public Learner getLearner();
+
/**
* Returns the current state of the session tracker. This is only currently
* used by a Learner to build a ping response packet.
- *
+ *
*/
- protected HashMap<Long, Integer> getTouchSnapshot() {
+ protected Map<Long, Integer> getTouchSnapshot() {
if (sessionTracker != null) {
return ((LearnerSessionTracker) sessionTracker).snapshot();
}
- return new HashMap<Long, Integer>();
+ Map<Long, Integer> map = Collections.emptyMap();
+ return map;
}
-
+
/**
* Returns the id of the associated QuorumPeer, which will do for a unique
- * id of this server.
+ * id of this server.
*/
@Override
public long getServerId() {
return self.getId();
- }
-
+ }
+
@Override
public void createSessionTracker() {
- sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
- self.getId());
+ sessionTracker = new LearnerSessionTracker(
+ this, getZKDatabase().getSessionWithTimeOuts(),
+ this.tickTime, self.getId(), self.areLocalSessionsEnabled());
}
-
- @Override
- protected void startSessionTracker() {}
-
+
@Override
protected void revalidateSession(ServerCnxn cnxn, long sessionId,
int sessionTimeout) throws IOException {
- getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+ if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+ super.revalidateSession(cnxn, sessionId, sessionTimeout);
+ } else {
+ getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+ }
}
-
+
@Override
protected void registerJMX() {
// register with JMX
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.server.SessionTrackerImpl;
+
+/**
+ * Local session tracker.
+ */
+public class LocalSessionTracker extends SessionTrackerImpl {
+ public LocalSessionTracker(SessionExpirer expirer,
+ ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+ int tickTime, long id) {
+ super(expirer, sessionsWithTimeouts, tickTime, id);
+ }
+
+ public boolean isLocalSession(long sessionId) {
+ return isTrackingSession(sessionId);
+ }
+
+ public boolean isGlobalSession(long sessionId) {
+ return false;
+ }
+
+ public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+ throw new UnsupportedOperationException();
+ }
+}
Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -18,15 +18,18 @@
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.server.RequestProcessor;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
/**
* This RequestProcessor forwards any requests that modify the state of the
@@ -91,12 +94,17 @@ public class ObserverRequestProcessor ex
case OpCode.setData:
case OpCode.reconfig:
case OpCode.setACL:
- case OpCode.createSession:
- case OpCode.closeSession:
case OpCode.multi:
case OpCode.check:
zks.getObserver().request(request);
break;
+ case OpCode.createSession:
+ case OpCode.closeSession:
+ // Don't forward local sessions to the leader.
+ if (!request.isLocalSession()) {
+ zks.getObserver().request(request);
+ }
+ break;
}
}
} catch (Exception e) {
@@ -110,6 +118,22 @@ public class ObserverRequestProcessor ex
*/
public void processRequest(Request request) {
if (!finished) {
+ Request upgradeRequest = null;
+ try {
+ upgradeRequest = zks.checkUpgradeSession(request);
+ } catch (KeeperException ke) {
+ if (request.getHdr() != null) {
+ request.getHdr().setType(OpCode.error);
+ request.setTxn(new ErrorTxn(ke.code().intValue()));
+ }
+ request.setException(ke);
+ LOG.info("Error creating upgrade request", ke);
+ } catch (IOException ie) {
+ LOG.error("Unexpected error in upgrade", ie);
+ }
+ if (upgradeRequest != null) {
+ queuedRequests.add(upgradeRequest);
+ }
queuedRequests.add(request);
}
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Wed Oct 9 20:21:55 2013
@@ -68,10 +68,10 @@ public class ProposalRequestProcessor im
* call processRequest on the next processor.
*/
- if(request instanceof LearnerSyncRequest){
+ if (request instanceof LearnerSyncRequest){
zks.getLeader().processSync((LearnerSyncRequest)request);
} else {
- nextProcessor.processRequest(request);
+ nextProcessor.processRequest(request);
if (request.getHdr() != null) {
// We need to sync and get consensus on any transactions
try {
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Oct 9 20:21:55 2013
@@ -381,6 +381,18 @@ public class QuorumPeer extends Thread i
protected int tickTime;
/**
+ * Whether learners in this quorum should create new sessions as local.
+ * False by default to preserve existing behavior.
+ */
+ protected boolean localSessionsEnabled = false;
+
+ /**
+ * Whether learners in this quorum should upgrade local sessions to
+ * global. Only matters if local sessions are enabled.
+ */
+ protected boolean localSessionsUpgradingEnabled = true;
+
+ /**
* Minimum number of milliseconds to allow for session timeout.
* A value of -1 indicates unset, use default.
*/
@@ -1142,6 +1154,28 @@ public class QuorumPeer extends Thread i
return fac.getMaxClientCnxnsPerHost();
}
+ /** Whether local sessions are enabled */
+ public boolean areLocalSessionsEnabled() {
+ return localSessionsEnabled;
+ }
+
+ /** Whether to enable local sessions */
+ public void enableLocalSessions(boolean flag) {
+ LOG.info("Local sessions " + (flag ? "enabled" : "disabled"));
+ localSessionsEnabled = flag;
+ }
+
+ /** Whether local sessions are allowed to upgrade to global sessions */
+ public boolean isLocalSessionsUpgradingEnabled() {
+ return localSessionsUpgradingEnabled;
+ }
+
+ /** Whether to allow local sessions to upgrade to global sessions */
+ public void enableLocalSessionsUpgrading(boolean flag) {
+ LOG.info("Local session upgrading " + (flag ? "enabled" : "disabled"));
+ localSessionsUpgradingEnabled = flag;
+ }
+
/** minimum session timeout in milliseconds */
public int getMinSessionTimeout() {
return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
@@ -1158,7 +1192,7 @@ public class QuorumPeer extends Thread i
return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
}
- /** minimum session timeout in milliseconds */
+ /** maximum session timeout in milliseconds */
public void setMaxSessionTimeout(int max) {
LOG.info("maxSessionTimeout set to " + max);
this.maxSessionTimeout = max;
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Wed Oct 9 20:21:55 2013
@@ -62,6 +62,8 @@ public class QuorumPeerConfig {
protected int minSessionTimeout = -1;
/** defaults to -1 if not set explicitly */
protected int maxSessionTimeout = -1;
+ protected boolean localSessionsEnabled = false;
+ protected boolean localSessionsUpgradingEnabled = false;
protected int initLimit;
protected int syncLimit;
@@ -196,6 +198,10 @@ public class QuorumPeerConfig {
dataLogDir = vff.create(value);
} else if (key.equals("clientPort")) {
clientPort = Integer.parseInt(value);
+ } else if (key.equals("localSessionsEnabled")) {
+ localSessionsEnabled = Boolean.parseBoolean(value);
+ } else if (key.equals("localSessionsUpgradingEnabled")) {
+ localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
} else if (key.equals("clientPortAddress")) {
clientPortAddress = value.trim();
} else if (key.equals("tickTime")) {
@@ -503,12 +509,16 @@ public class QuorumPeerConfig {
public int getMaxClientCnxns() { return maxClientCnxns; }
public int getMinSessionTimeout() { return minSessionTimeout; }
public int getMaxSessionTimeout() { return maxSessionTimeout; }
+ public boolean areLocalSessionsEnabled() { return localSessionsEnabled; }
+ public boolean isLocalSessionsUpgradingEnabled() {
+ return localSessionsUpgradingEnabled;
+ }
public int getInitLimit() { return initLimit; }
public int getSyncLimit() { return syncLimit; }
public int getElectionAlg() { return electionAlg; }
- public int getElectionPort() { return electionPort; }
-
+ public int getElectionPort() { return electionPort; }
+
public int getSnapRetainCount() {
return snapRetainCount;
}
@@ -521,7 +531,7 @@ public class QuorumPeerConfig {
return syncEnabled;
}
- public QuorumVerifier getQuorumVerifier() {
+ public QuorumVerifier getQuorumVerifier() {
return quorumVerifier;
}