You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/09 22:21:56 UTC

svn commit: r1530781 [1/2] - in /zookeeper/trunk: ./ src/c/include/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/o...

Author: thawan
Date: Wed Oct  9 20:21:55 2013
New Revision: 1530781

URL: http://svn.apache.org/r1530781
Log:
ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)

Added:
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java   (with props)
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java   (with props)
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java   (with props)
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java   (with props)
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java   (with props)
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/c/include/zookeeper.h
    zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Wed Oct  9 20:21:55 2013
@@ -16,6 +16,8 @@ NEW FEATURES:
   ZOOKEEPER-1400. Allow logging via callback instead of raw FILE pointer
   (Marshall McMullen via michim)
 
+  ZOOKEEPER-1147. Add support for local sessions (Jay Shrauner, thawan via thawan)
+
 BUGFIXES:
 
   ZOOKEEPER-786. Exception in ZooKeeper.toString

Modified: zookeeper/trunk/src/c/include/zookeeper.h
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/c/include/zookeeper.h?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/c/include/zookeeper.h (original)
+++ zookeeper/trunk/src/c/include/zookeeper.h Wed Oct  9 20:21:55 2013
@@ -116,8 +116,9 @@ enum ZOO_ERRORS {
   ZSESSIONMOVED = -118, /*!<session moved to another server, so operation is ignored */
   ZNEWCONFIGNOQUORUM = -120,  /*!< No quorum of new config is connected and up-to-date with the leader of last commmitted config - try
                                  invoking reconfiguration after new servers are connected and synced */
-  ZRECONFIGINPROGRESS = -121  /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
+  ZRECONFIGINPROGRESS = -121,  /*!< Reconfiguration requested while another reconfiguration is currently in progress. This is currently
                                        not supported. Please retry. */
+  ZEPHEMERALONLOCALSESSION = -122 /*!< Attempt to create ephemeral node on a local session */
 };
 
 #ifdef __cplusplus

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/KeeperException.java Wed Oct  9 20:21:55 2013
@@ -135,7 +135,8 @@ public abstract class KeeperException ex
                 return new SessionMovedException();
             case NOTREADONLY:
                 return new NotReadOnlyException();
-            	
+            case EPHEMERALONLOCALSESSION:
+                return new EphemeralOnLocalSessionException();
             case OK:
             default:
                 throw new IllegalArgumentException("Invalid exception code");
@@ -222,6 +223,9 @@ public abstract class KeeperException ex
         @Deprecated
         public static final int BadArguments = -8;
 
+        @Deprecated
+        public static final int UnknownSession= -12;
+
         /**
          * @deprecated deprecated in 3.1.0, use {@link Code#APIERROR} instead
          */
@@ -291,6 +295,9 @@ public abstract class KeeperException ex
         @Deprecated
         public static final int ReconfigInProgress= -121;
         
+        @Deprecated
+        public static final int EphemeralOnLocalSession = -122;
+
     }
 
     /** Codes which represent the various KeeperException
@@ -328,6 +335,8 @@ public abstract class KeeperException ex
         NEWCONFIGNOQUORUM (NewConfigNoQuorum),
         /** Another reconfiguration is in progress -- concurrent reconfigs not supported (yet) */
         RECONFIGINPROGRESS (ReconfigInProgress),
+        /** Unknown session (internal server use only) */
+        UNKNOWNSESSION (UnknownSession),
         
         /** API errors.
          * This is never thrown by the server, it shouldn't be used other than
@@ -361,7 +370,9 @@ public abstract class KeeperException ex
         /** Session moved to another server, so operation is ignored */
         SESSIONMOVED (-118),
         /** State-changing request is passed to read-only server */
-        NOTREADONLY (-119);
+        NOTREADONLY (-119),
+        /** Attempt to create ephemeral node on a local session */
+        EPHEMERALONLOCALSESSION (EphemeralOnLocalSession);
 
         private static final Map<Integer,Code> lookup
             = new HashMap<Integer,Code>();
@@ -442,6 +453,8 @@ public abstract class KeeperException ex
                 return "Session moved";
             case NOTREADONLY:
                 return "Not a read-only call";
+            case EPHEMERALONLOCALSESSION:
+                return "Ephemeral node on local session";
             default:
                 return "Unknown error " + code;
         }
@@ -502,7 +515,7 @@ public abstract class KeeperException ex
      * If this exception was thrown by a multi-request then the (partial) results
      * and error codes can be retrieved using this getter.
      * @return A copy of the list of results from the operations in the multi-request.
-     * 
+     *
      * @since 3.4.0
      *
      */
@@ -701,7 +714,16 @@ public abstract class KeeperException ex
             super(Code.SESSIONEXPIRED);
         }
     }
-    
+
+    /**
+     * @see Code#UNKNOWNSESSION
+     */
+    public static class UnknownSessionException extends KeeperException {
+        public UnknownSessionException() {
+            super(Code.UNKNOWNSESSION);
+        }
+    }
+
     /**
      * @see Code#SESSIONMOVED
      */
@@ -721,6 +743,15 @@ public abstract class KeeperException ex
     }
 
     /**
+     * @see Code#EPHEMERALONLOCALSESSION
+     */
+    public static class EphemeralOnLocalSessionException extends KeeperException {
+        public EphemeralOnLocalSessionException() {
+            super(Code.EPHEMERALONLOCALSESSION);
+        }
+    }
+
+    /**
      * @see Code#SYSTEMERROR
      */
     public static class SystemErrorException extends KeeperException {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/cli/CreateCommand.java Wed Oct  9 20:21:55 2013
@@ -32,16 +32,16 @@ public class CreateCommand extends CliCo
     private static Options options = new Options();
     private String[] args;
     private CommandLine cl;
-    
+
     {
         options.addOption(new Option("e", false, "ephemeral"));
         options.addOption(new Option("s", false, "sequential"));
     }
-    
+
     public CreateCommand() {
         super("create", "[-s] [-e] path [data] [acl]");
     }
-    
+
 
     @Override
     public CliCommand parse(String[] cmdArgs) throws ParseException {
@@ -54,7 +54,7 @@ public class CreateCommand extends CliCo
         return this;
     }
 
-    
+
     @Override
     public boolean exec() throws KeeperException, InterruptedException {
         CreateMode flags = CreateMode.PERSISTENT;
@@ -74,8 +74,13 @@ public class CreateCommand extends CliCo
         if (args.length > 3) {
             acl = AclParser.parse(args[3]);
         }
-        String newPath = zk.create(path, data, acl, flags);
-        err.println("Created " + newPath);
+        try {
+            String newPath = zk.create(path, data, acl, flags);
+            err.println("Created " + newPath);
+        } catch(KeeperException.EphemeralOnLocalSessionException e) {
+            err.println("Unable to create ephemeral node on a local session");
+            return false;
+        }
         return true;
     }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -96,6 +96,11 @@ public class FinalRequestProcessor imple
         }
         ProcessTxnResult rc = null;
         synchronized (zks.outstandingChanges) {
+            // Need to process local session requests
+            rc = zks.processTxn(request);
+
+            // request.hdr is set for write requests, which are the only ones
+            // that add to outstandingChanges.
             if (request.getHdr() != null) {
                 TxnHeader hdr = request.getHdr();
                 Record txn = request.getTxn();
@@ -111,16 +116,15 @@ public class FinalRequestProcessor imple
                         zks.outstandingChangesForPath.remove(cr.path);
                     }
                 }
-
-                rc = zks.processTxn(hdr, txn);
             }
+
             // do not add non quorum packets to the queue.
-            if (Request.isQuorum(request.type)) {
+            if (request.isQuorum()) {
                 zks.getZKDatabase().addCommittedProposal(request);
             }
         }
 
-        if (request.getHdr() != null && request.getHdr().getType() == OpCode.closeSession) {
+        if (request.type == OpCode.closeSession) {
             ServerCnxnFactory scxn = zks.getServerCnxnFactory();
             // this might be possible since
             // we might just be playing diffs from the leader
@@ -145,8 +149,19 @@ public class FinalRequestProcessor imple
         Record rsp = null;
         try {
             if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
-                throw KeeperException.create(KeeperException.Code.get((
-                        (ErrorTxn) request.getTxn()).getErr()));
+                /*
+                 * When local session upgrading is disabled, leader will
+                 * reject the ephemeral node creation due to session expire.
+                 * However, if this is the follower that issue the request,
+                 * it will have the correct error code, so we should use that
+                 * and report to user
+                 */
+                if (request.getException() != null) {
+                    throw request.getException();
+                } else {
+                    throw KeeperException.create(KeeperException.Code
+                            .get(((ErrorTxn) request.getTxn()).getErr()));
+                }
             }
 
             KeeperException ke = request.getException();

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -349,11 +349,12 @@ public class PrepRequestProcessor extend
 
         switch (type) {
             case OpCode.create: {
-                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 CreateRequest createRequest = (CreateRequest)record;
                 if (deserialize) {
                     ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                 }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+                validateCreateRequest(createMode, request);
                 String path = createRequest.getPath();
                 String parentPath = validatePathForCreate(path, request.sessionId);
 
@@ -365,7 +366,6 @@ public class PrepRequestProcessor extend
 
                 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
                 int parentCVersion = parentRecord.stat.getCversion();
-                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                 if (createMode.isSequential()) {
                     path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                 }
@@ -402,11 +402,12 @@ public class PrepRequestProcessor extend
                 break;
             }
             case OpCode.create2: {
-                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 Create2Request createRequest = (Create2Request)record;
                 if (deserialize) {
                     ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
                 }
+                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+                validateCreateRequest(createMode, request);
                 String path = createRequest.getPath();
                 String parentPath = validatePathForCreate(path, request.sessionId);
 
@@ -418,7 +419,6 @@ public class PrepRequestProcessor extend
 
                 checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
                 int parentCVersion = parentRecord.stat.getCversion();
-                CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                 if (createMode.isSequential()) {
                     path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
                 }
@@ -624,7 +624,13 @@ public class PrepRequestProcessor extend
                 int to = request.request.getInt();
                 request.setTxn(new CreateSessionTxn(to));
                 request.request.rewind();
-                zks.sessionTracker.addSession(request.sessionId, to);
+                if (request.isLocalSession()) {
+                    // This will add to local session tracker if it is enabled
+                    zks.sessionTracker.addSession(request.sessionId, to);
+                } else {
+                    // Explicitly add to global session if the flag is not set
+                    zks.sessionTracker.addGlobalSession(request.sessionId, to);
+                }
                 zks.setOwner(request.sessionId, request.getOwner());
                 break;
             case OpCode.closeSession:
@@ -791,7 +797,10 @@ public class PrepRequestProcessor extend
             //create/close session don't require request record
             case OpCode.createSession:
             case OpCode.closeSession:
-                pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
+                if (!request.isLocalSession()) {
+                    pRequest2Txn(request.type, zks.getNextZxid(), request,
+                                 null, true);
+                }
                 break;
 
             //All the rest don't need to create a Txn - just verify session
@@ -855,7 +864,22 @@ public class PrepRequestProcessor extend
         }
         return retval;
     }
-
+    
+    private void validateCreateRequest(CreateMode createMode, Request request)
+            throws KeeperException {
+        if (createMode.isEphemeral()) {
+            // Exception is set when local session failed to upgrade
+            // so we just need to report the error
+            if (request.getException() != null) {
+                throw request.getException();
+            }
+            zks.sessionTracker.checkGlobalSession(request.sessionId,
+                    request.getOwner());
+        } else {
+            zks.sessionTracker.checkSession(request.sessionId,
+                    request.getOwner());
+        }
+    }
 
     /**
      * This method checks out the acl making sure it isn't null or empty,

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/Request.java Wed Oct  9 20:21:55 2013
@@ -83,6 +83,19 @@ public class Request {
 
     public QuorumVerifier qv = null;
     
+    /**
+     * If this is a create or close request for a local-only session.
+     */
+    private boolean isLocalSession = false;
+
+    public boolean isLocalSession() {
+        return isLocalSession;
+    }
+
+    public void setLocalSession(boolean isLocalSession) {
+        this.isLocalSession = isLocalSession;
+    }
+
     public Object getOwner() {
         return owner;
     }
@@ -119,43 +132,41 @@ public class Request {
         switch (type) {
         case OpCode.notification:
             return false;
+        case OpCode.check:
+        case OpCode.closeSession:
         case OpCode.create:
         case OpCode.create2:
-        case OpCode.delete:
         case OpCode.createSession:
+        case OpCode.delete:
         case OpCode.exists:
-        case OpCode.getData:
-        case OpCode.check:
-        case OpCode.multi:
-        case OpCode.setData:
-        case OpCode.sync:
         case OpCode.getACL:
-        case OpCode.setACL:
         case OpCode.getChildren:
         case OpCode.getChildren2:
+        case OpCode.getData:
+        case OpCode.multi:
         case OpCode.ping:
-        case OpCode.closeSession:
-        case OpCode.setWatches:
         case OpCode.reconfig:
+        case OpCode.setACL:
+        case OpCode.setData:
+        case OpCode.setWatches:
+        case OpCode.sync:
             return true;
         default:
             return false;
         }
     }
 
-    static boolean isQuorum(int type) {
-        switch (type) {
+    public boolean isQuorum() {
+        switch (this.type) {
         case OpCode.exists:
         case OpCode.getACL:
         case OpCode.getChildren:
         case OpCode.getChildren2:
         case OpCode.getData:
             return false;
-        case OpCode.error:
-        case OpCode.closeSession:
         case OpCode.create:
         case OpCode.create2:
-        case OpCode.createSession:
+        case OpCode.error:
         case OpCode.delete:
         case OpCode.setACL:
         case OpCode.setData:
@@ -163,6 +174,9 @@ public class Request {
         case OpCode.multi:
         case OpCode.reconfig:
             return true;
+        case OpCode.closeSession:
+        case OpCode.createSession:
+            return !this.isLocalSession;
         default:
             return false;
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTracker.java Wed Oct  9 20:21:55 2013
@@ -44,7 +44,22 @@ public interface SessionTracker {
 
     long createSession(int sessionTimeout);
 
-    void addSession(long id, int to);
+    /**
+     * Add a global session to those being tracked.
+     * @param id sessionId
+     * @param to sessionTimeout
+     * @return whether the session was newly added (if false, already existed)
+     */
+    boolean addGlobalSession(long id, int to);
+
+    /**
+     * Add a session to those being tracked. The session is added as a local
+     * session if they are enabled, otherwise as global.
+     * @param id sessionId
+     * @param to sessionTimeout
+     * @return whether the session was newly added (if false, already existed)
+     */
+    boolean addSession(long id, int to);
 
     /**
      * @param sessionId
@@ -60,7 +75,7 @@ public interface SessionTracker {
     void setSessionClosing(long sessionId);
 
     /**
-     * 
+     *
      */
     void shutdown();
 
@@ -69,7 +84,39 @@ public interface SessionTracker {
      */
     void removeSession(long sessionId);
 
-    void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, SessionMovedException;
+    /**
+     * @param sessionId
+     * @return whether or not the SessionTracker is aware of this session
+     */
+    boolean isTrackingSession(long sessionId);
+
+    /**
+     * Checks whether the SessionTracker is aware of this session, the session
+     * is still active, and the owner matches. If the owner wasn't previously
+     * set, this sets the owner of the session.
+     *
+     * UnknownSessionException should never been thrown to the client. It is
+     * only used internally to deal with possible local session from other
+     * machine
+     *
+     * @param sessionId
+     * @param owner
+     */
+    public void checkSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException,
+            KeeperException.UnknownSessionException;
+
+    /**
+     * Strictly check that a given session is a global session or not
+     * @param sessionId
+     * @param owner
+     * @throws KeeperException.SessionExpiredException
+     * @throws KeeperException.SessionMovedException
+     */
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException;
 
     void setOwner(long id, Object owner) throws SessionExpiredException;
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java Wed Oct  9 20:21:55 2013
@@ -20,18 +20,15 @@ package org.apache.zookeeper.server;
 
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Date;
 import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This is a full featured SessionTracker. It tracks session in grouped by tick
@@ -47,8 +44,7 @@ public class SessionTrackerImpl extends 
 
     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
 
-    private final ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
-    private final long serverId;
+    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
     private final AtomicLong nextSessionId = new AtomicLong();
 
     public static class SessionImpl implements Session {
@@ -73,6 +69,10 @@ public class SessionTrackerImpl extends 
         }
     }
 
+    /**
+     * Generates an initial sessionId. High order byte is serverId, next 5
+     * 5 bytes are from timestamp, and low order 2 bytes are 0s.
+     */
     public static long initializeNextSession(long id) {
         long nextSid = 0;
         nextSid = (System.currentTimeMillis() << 24) >> 8;
@@ -83,15 +83,14 @@ public class SessionTrackerImpl extends 
     private final SessionExpirer expirer;
 
     public SessionTrackerImpl(SessionExpirer expirer,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeout, int tickTime,
-            long sid)
+            ConcurrentMap<Long, Integer> sessionsWithTimeout, int tickTime,
+            long serverId)
     {
         super("SessionTracker");
         this.expirer = expirer;
         this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
         this.sessionsWithTimeout = sessionsWithTimeout;
-        this.serverId = sid;
-        this.nextSessionId.set(initializeNextSession(sid));
+        this.nextSessionId.set(initializeNextSession(serverId));
         for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) {
             addSession(e.getKey(), e.getValue());
         }
@@ -103,7 +102,7 @@ public class SessionTrackerImpl extends 
         pwriter.print("Session ");
         sessionExpiryQueue.dump(pwriter);
     }
-
+    
     @Override
     public String toString() {
         StringWriter sw = new StringWriter();
@@ -151,6 +150,10 @@ public class SessionTrackerImpl extends 
         return true;
     }
 
+    public int getSessionTimeout(long sessionId) {
+        return sessionsWithTimeout.get(sessionId);
+    }
+
     synchronized public void setSessionClosing(long sessionId) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Session closing: 0x" + Long.toHexString(sessionId));
@@ -192,31 +195,49 @@ public class SessionTrackerImpl extends 
         return sessionId;
     }
 
-    synchronized public void addSession(long id, int sessionTimeout) {
+    public boolean addGlobalSession(long id, int sessionTimeout) {
+        return addSession(id, sessionTimeout);
+    }
+
+    public synchronized boolean addSession(long id, int sessionTimeout) {
+        boolean added = false;
+
         sessionsWithTimeout.put(id, sessionTimeout);
         if (sessionsById.get(id) == null) {
             SessionImpl s = new SessionImpl(id, sessionTimeout);
             sessionsById.put(id, s);
-            if (LOG.isTraceEnabled()) {
-                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                        "SessionTrackerImpl --- Adding session 0x"
-                        + Long.toHexString(id) + " " + sessionTimeout);
-            }
-        } else {
-            if (LOG.isTraceEnabled()) {
-                ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
-                        "SessionTrackerImpl --- Existing session 0x"
-                        + Long.toHexString(id) + " " + sessionTimeout);
-            }
+            added = true;
+            LOG.debug("Adding session 0x" + Long.toHexString(id));
+        }
+        if (LOG.isTraceEnabled()) {
+            String actionStr = added ? "Adding" : "Existing";
+            ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
+                    "SessionTrackerImpl --- " + actionStr + " session 0x"
+                    + Long.toHexString(id) + " " + sessionTimeout);
         }
         touchSession(id, sessionTimeout);
+        return added;
+    }
+
+    public boolean isTrackingSession(long sessionId) {
+        return sessionsById.containsKey(sessionId);
     }
 
-    synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
+    public synchronized void checkSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException,
+            KeeperException.UnknownSessionException {
+        LOG.debug("Checking session 0x" + Long.toHexString(sessionId));
         SessionImpl session = sessionsById.get(sessionId);
-        if (session == null || session.isClosing()) {
+
+        if (session == null) {
+            throw new KeeperException.UnknownSessionException();
+        }
+
+        if (session.isClosing()) {
             throw new KeeperException.SessionExpiredException();
         }
+
         if (session.owner == null) {
             session.owner = owner;
         } else if (session.owner != owner) {
@@ -231,4 +252,14 @@ public class SessionTrackerImpl extends 
         }
         session.owner = owner;
     }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException {
+        try {
+            checkSession(sessionId, owner);
+        } catch (KeeperException.UnknownSessionException e) {
+            throw new KeeperException.SessionExpiredException();
+        }
+    }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/TraceFormatter.java Wed Oct  9 20:21:55 2013
@@ -29,7 +29,7 @@ import org.apache.zookeeper.ZooDefs.OpCo
 
 public class TraceFormatter {
 
-    static String op2String(int op) {
+    public static String op2String(int op) {
         switch (op) {
         case OpCode.notification:
             return "notification";

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Wed Oct  9 20:21:55 2013
@@ -287,6 +287,10 @@ public class ZooKeeperServer implements 
         return hzxid.get();
     }
 
+    public SessionTracker getSessionTracker() {
+        return sessionTracker;
+    }
+    
     long getNextZxid() {
         return hzxid.incrementAndGet();
     }
@@ -300,7 +304,9 @@ public class ZooKeeperServer implements 
     }
 
     private void close(long sessionId) {
-        submitRequest(null, sessionId, OpCode.closeSession, 0, null, null);
+        Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
+        setLocalSessionFlag(si);
+        submitRequest(si);
     }
 
     public void closeSession(long sessionId) {
@@ -409,7 +415,7 @@ public class ZooKeeperServer implements 
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
     }
-    
+
     protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
     }
@@ -518,13 +524,19 @@ public class ZooKeeperServer implements 
     }
 
     long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
+        if (passwd == null) {
+            // Possible since it's just deserialized from a packet on the wire.
+            passwd = new byte[0];
+        }
         long sessionId = sessionTracker.createSession(timeout);
         Random r = new Random(sessionId ^ superSecret);
         r.nextBytes(passwd);
         ByteBuffer to = ByteBuffer.allocate(4);
         to.putInt(timeout);
         cnxn.setSessionId(sessionId);
-        submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
+        Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
+        setLocalSessionFlag(si);
+        submitRequest(si);
         return sessionId;
     }
 
@@ -554,6 +566,8 @@ public class ZooKeeperServer implements 
         if (checkPasswd(sessionId, passwd)) {
             revalidateSession(cnxn, sessionId, sessionTimeout);
         } else {
+            LOG.warn("Incorrect password from " + cnxn.getRemoteSocketAddress()
+                    + " for session 0x" + Long.toHexString(sessionId));
             finishSessionInit(cnxn, false);
         }
     }
@@ -618,15 +632,13 @@ public class ZooKeeperServer implements 
     }
 
     /**
-     * @param cnxn
-     * @param sessionId
-     * @param xid
-     * @param bb
+     * If the underlying Zookeeper server support local session, this method
+     * will set a isLocalSession to true if a request is associated with
+     * a local session.
+     *
+     * @param si
      */
-    private void submitRequest(ServerCnxn cnxn, long sessionId, int type,
-            int xid, ByteBuffer bb, List<Id> authInfo) {
-        Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo);
-        submitRequest(si);
+    protected void setLocalSessionFlag(Request si) {
     }
 
     public void submitRequest(Request si) {
@@ -919,6 +931,9 @@ public class ZooKeeperServer implements 
                 Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                   h.getType(), incomingBuffer, cnxn.getAuthInfo());
                 si.setOwner(ServerCnxn.me);
+                // Always treat packet from the client as a possible
+                // local request.
+                setLocalSessionFlag(si);
                 submitRequest(si);
             }
         }
@@ -966,17 +981,36 @@ public class ZooKeeperServer implements 
         // wrap SASL response token to client inside a Response object.
         return new SetSASLResponse(responseToken);
     }
-    
+
+    // entry point for quorum/Learner.java
     public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        return processTxn(null, hdr, txn);
+    }
+
+    // entry point for FinalRequestProcessor.java
+    public ProcessTxnResult processTxn(Request request) {
+        return processTxn(request, request.getHdr(), request.getTxn());
+    }
+
+    private ProcessTxnResult processTxn(Request request, TxnHeader hdr,
+                                        Record txn) {
         ProcessTxnResult rc;
-        int opCode = hdr.getType();
-        long sessionId = hdr.getClientId();
-        rc = getZKDatabase().processTxn(hdr, txn);
+        int opCode = request != null ? request.type : hdr.getType();
+        long sessionId = request != null ? request.sessionId : hdr.getClientId();
+        if (hdr != null) {
+            rc = getZKDatabase().processTxn(hdr, txn);
+        } else {
+            rc = new ProcessTxnResult();
+        }
         if (opCode == OpCode.createSession) {
-            if (txn instanceof CreateSessionTxn) {
+            if (hdr != null && txn instanceof CreateSessionTxn) {
                 CreateSessionTxn cst = (CreateSessionTxn) txn;
-                sessionTracker.addSession(sessionId, cst
-                        .getTimeOut());
+                sessionTracker.addGlobalSession(sessionId, cst.getTimeOut());
+            } else if (request != null && request.isLocalSession()) {
+                request.request.rewind();
+                int timeout = request.request.getInt();
+                request.request.rewind();
+                sessionTracker.addSession(request.sessionId, timeout);
             } else {
                 LOG.warn("*****>>>>> Got "
                         + txn.getClass() + " "

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java Wed Oct  9 20:21:55 2013
@@ -134,11 +134,12 @@ public class CommitProcessor extends Thr
             case OpCode.reconfig:
             case OpCode.multi:
             case OpCode.setACL:
-            case OpCode.createSession:
-            case OpCode.closeSession:
                 return true;
             case OpCode.sync:
-                return matchSyncs;
+                return matchSyncs;    
+            case OpCode.createSession:
+            case OpCode.closeSession:
+                return !request.isLocalSession();
             default:
                 return false;
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -18,15 +18,17 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
-import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This RequestProcessor forwards any requests that modify the state of the
@@ -83,12 +85,17 @@ public class FollowerRequestProcessor ex
                 case OpCode.setData:
                 case OpCode.reconfig:
                 case OpCode.setACL:
-                case OpCode.createSession:
-                case OpCode.closeSession:
                 case OpCode.multi:
                 case OpCode.check:
                     zks.getFollower().request(request);
                     break;
+                case OpCode.createSession:
+                case OpCode.closeSession:
+                    // Don't forward local sessions to the leader.
+                    if (!request.isLocalSession()) {
+                        zks.getFollower().request(request);
+                    }
+                    break;
                 }
             }
         } catch (Exception e) {
@@ -99,6 +106,25 @@ public class FollowerRequestProcessor ex
 
     public void processRequest(Request request) {
         if (!finished) {
+            // Before sending the request, check if the request requires a
+            // global session and what we have is a local session. If so do
+            // an upgrade.
+            Request upgradeRequest = null;
+            try {
+                upgradeRequest = zks.checkUpgradeSession(request);
+            } catch (KeeperException ke) {
+                if (request.getHdr() != null) {
+                    request.getHdr().setType(OpCode.error);
+                    request.setTxn(new ErrorTxn(ke.code().intValue()));
+                }
+                request.setException(ke);
+                LOG.info("Error creating upgrade request",  ke);
+            } catch (IOException ie) {
+                LOG.error("Unexpected error in upgrade", ie);
+            }
+            if (upgradeRequest != null) {
+                queuedRequests.add(upgradeRequest);
+            }
             queuedRequests.add(request);
         }
     }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import java.io.IOException;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.txn.ErrorTxn;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Responsible for performing local session upgrade. Only request submitted
+ * directly to the leader should go through this processor.
+ */
+public class LeaderRequestProcessor implements RequestProcessor {
+    private static final Logger LOG = LoggerFactory
+            .getLogger(LeaderRequestProcessor.class);
+
+    private final LeaderZooKeeperServer lzks;
+
+    private final RequestProcessor nextProcessor;
+
+    public LeaderRequestProcessor(LeaderZooKeeperServer zks,
+            RequestProcessor nextProcessor) {
+        this.lzks = zks;
+        this.nextProcessor = nextProcessor;
+    }
+
+    @Override
+    public void processRequest(Request request)
+            throws RequestProcessorException {
+        // Check if this is a local session and we are trying to create
+        // an ephemeral node, in which case we upgrade the session
+        Request upgradeRequest = null;
+        try {
+            upgradeRequest = lzks.checkUpgradeSession(request);
+        } catch (KeeperException ke) {
+            if (request.getHdr() != null) {
+                LOG.debug("Updating header");
+                request.getHdr().setType(OpCode.error);
+                request.setTxn(new ErrorTxn(ke.code().intValue()));
+            }
+            request.setException(ke);
+            LOG.info("Error creating upgrade request " + ke.getMessage());
+        } catch (IOException ie) {
+            LOG.error("Unexpected error in upgrade", ie);
+        }
+        if (upgradeRequest != null) {
+            nextProcessor.processRequest(upgradeRequest);
+        }
+
+        nextProcessor.processRequest(request);
+    }
+
+    @Override
+    public void shutdown() {
+        LOG.info("Shutting down");
+        nextProcessor.shutdown();
+    }
+
+}

Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderRequestProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.io.PrintWriter;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
+import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The leader session tracker tracks local and global sessions on the leader.
+ */
+public class LeaderSessionTracker extends UpgradeableSessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(LeaderSessionTracker.class);
+
+    private final boolean localSessionsEnabled;
+    private final SessionTrackerImpl globalSessionTracker;
+
+    /**
+     * Server id of the leader
+     */
+    private final long serverId;
+
+    public LeaderSessionTracker(SessionExpirer expirer,
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id, boolean localSessionsEnabled) {
+
+        this.globalSessionTracker = new SessionTrackerImpl(
+            expirer, sessionsWithTimeouts, tickTime, id);
+
+        this.localSessionsEnabled = localSessionsEnabled;
+        if (this.localSessionsEnabled) {
+            createLocalSessionTracker(expirer, tickTime, id);
+        }
+        serverId = id;
+    }
+
+    public void removeSession(long sessionId) {
+        if (localSessionTracker != null) {
+            localSessionTracker.removeSession(sessionId);
+        }
+        globalSessionTracker.removeSession(sessionId);
+    }
+
+    public void start() {
+        globalSessionTracker.start();
+        if (localSessionTracker != null) {
+            localSessionTracker.start();
+        }
+    }
+
+    public void shutdown() {
+        if (localSessionTracker != null) {
+            localSessionTracker.shutdown();
+        }
+        globalSessionTracker.shutdown();
+    }
+
+    public boolean isGlobalSession(long sessionId) {
+        return globalSessionTracker.isTrackingSession(sessionId);
+    }
+
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        boolean added =
+            globalSessionTracker.addSession(sessionId, sessionTimeout);
+        if (localSessionsEnabled && added) {
+            // Only do extra logging so we know what kind of session this is
+            // if we're supporting both kinds of sessions
+            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+        }
+        return added;
+    }
+
+    public boolean addSession(long sessionId, int sessionTimeout) {
+        boolean added;
+        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+            added = localSessionTracker.addSession(sessionId, sessionTimeout);
+            // Check for race condition with session upgrading
+            if (isGlobalSession(sessionId)) {
+                added = false;
+                localSessionTracker.removeSession(sessionId);
+            } else if (added) {
+              LOG.info("Adding local session 0x" + Long.toHexString(sessionId));
+            }
+        } else {
+            added = addGlobalSession(sessionId, sessionTimeout);
+        }
+        return added;
+    }
+
+    public boolean touchSession(long sessionId, int sessionTimeout) {
+        if (localSessionTracker != null &&
+            localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+            return true;
+        }
+        return globalSessionTracker.touchSession(sessionId, sessionTimeout);
+    }
+
+    public long createSession(int sessionTimeout) {
+        if (localSessionsEnabled) {
+            return localSessionTracker.createSession(sessionTimeout);
+        }
+        return globalSessionTracker.createSession(sessionTimeout);
+    }
+
+    // Returns the serverId from the sessionId (the high order byte)
+    public static long getServerIdFromSessionId(long sessionId) {
+        return sessionId >> 56;
+    }
+
+    public void checkSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException,
+            UnknownSessionException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.checkSession(sessionId, owner);
+                // A session can both be a local and global session during
+                // upgrade
+                if (!isGlobalSession(sessionId)) {
+                    return;
+                }
+            } catch(UnknownSessionException e) {
+                // Ignore. We'll check instead whether it's a global session
+            }
+        }
+        try {
+            globalSessionTracker.checkSession(sessionId, owner);
+            // if we can get here, it is a valid global session
+            return;
+        } catch (UnknownSessionException e) {
+            // Ignore. This may be local session from other servers.
+        }
+
+        /*
+         * if local session is not enabled or it used to be our local session
+         * throw sessions expires
+         */
+        if (!localSessionsEnabled
+                || (getServerIdFromSessionId(sessionId) == serverId)) {
+            throw new SessionExpiredException();
+        }
+    }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException {
+        try {
+            globalSessionTracker.checkSession(sessionId, owner);
+        } catch (UnknownSessionException e) {
+            // For global session, if we don't know it, it is already expired
+            throw new SessionExpiredException();
+        }
+    }
+
+    public void setOwner(long sessionId, Object owner)
+            throws SessionExpiredException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.setOwner(sessionId, owner);
+                return;
+            } catch(SessionExpiredException e) {
+                // Ignore. We'll check instead whether it's a global session
+            }
+        }
+        globalSessionTracker.setOwner(sessionId, owner);
+    }
+
+    public void dumpSessions(PrintWriter pwriter) {
+      if (localSessionTracker != null) {
+          pwriter.print("Local ");
+          localSessionTracker.dumpSessions(pwriter);
+          pwriter.print("Global ");
+      }
+      globalSessionTracker.dumpSessions(pwriter);
+    }
+
+    public void setSessionClosing(long sessionId) {
+        // call is no-op if session isn't tracked so safe to call both
+        if (localSessionTracker != null) {
+            localSessionTracker.setSessionClosing(sessionId);
+        }
+        globalSessionTracker.setSessionClosing(sessionId);
+    }
+}

Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java Wed Oct  9 20:21:55 2013
@@ -25,9 +25,9 @@ import org.apache.zookeeper.jmx.MBeanReg
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.ServerCnxn;
-import org.apache.zookeeper.server.SessionTrackerImpl;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
@@ -39,8 +39,11 @@ import org.apache.zookeeper.server.persi
  * FinalRequestProcessor
  */
 public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
+
     CommitProcessor commitProcessor;
 
+    PrepRequestProcessor prepRequestProcessor;
+
     /**
      * @param port
      * @param dataDir
@@ -64,8 +67,9 @@ public class LeaderZooKeeperServer exten
         ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
                 commitProcessor);
         proposalProcessor.initialize();
-        firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
-        ((PrepRequestProcessor)firstProcessor).start();
+        prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+        prepRequestProcessor.start();
+        firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
     }
 
     @Override
@@ -75,20 +79,45 @@ public class LeaderZooKeeperServer exten
 
     @Override
     public void createSessionTracker() {
-        sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
-                tickTime, self.getId());
-    }
-    
-    @Override
-    protected void startSessionTracker() {
-        ((SessionTrackerImpl)sessionTracker).start();
+        sessionTracker = new LeaderSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                tickTime, self.getId(), self.areLocalSessionsEnabled());
     }
 
-
     public boolean touch(long sess, int to) {
         return sessionTracker.touchSession(sess, to);
     }
 
+    public boolean checkIfValidGlobalSession(long sess, int to) {
+        if (self.areLocalSessionsEnabled() &&
+            !upgradeableSessionTracker.isGlobalSession(sess)) {
+            return false;
+        }
+        return sessionTracker.touchSession(sess, to);
+    }
+
+    /**
+     * Requests coming from the learner should go directly to
+     * PrepRequestProcessor
+     *
+     * @param request
+     */
+    public void submitLearnerRequest(Request request) {
+        /*
+         * Requests coming from the learner should have gone through
+         * submitRequest() on each server which already perform some request
+         * validation, so we don't need to do it again.
+         *
+         * Additionally, LearnerHandler should start submitting requests into
+         * the leader's pipeline only when the leader's server is started, so we
+         * can submit the request directly into PrepRequestProcessor.
+         *
+         * This is done so that requests from learners won't go through
+         * LeaderRequestProcessor which perform local session upgrade.
+         */
+        prepRequestProcessor.processRequest(request);
+    }
+
     @Override
     protected void registerJMX() {
         // register with JMX

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Learner.java Wed Oct  9 20:21:55 2013
@@ -30,6 +30,7 @@ import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.LinkedList;
+import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -536,8 +537,7 @@ public class Learner {       
         DataInputStream dis = new DataInputStream(bis);
         long sessionId = dis.readLong();
         boolean valid = dis.readBoolean();
-        ServerCnxn cnxn = pendingRevalidations
-        .remove(sessionId);
+        ServerCnxn cnxn = pendingRevalidations.remove(sessionId);
         if (cnxn == null) {
             LOG.warn("Missing session 0x"
                     + Long.toHexString(sessionId)
@@ -557,8 +557,7 @@ public class Learner {       
         // Send back the ping with our session data
         ByteArrayOutputStream bos = new ByteArrayOutputStream();
         DataOutputStream dos = new DataOutputStream(bos);
-        HashMap<Long, Integer> touchTable = zk
-                .getTouchSnapshot();
+        Map<Long, Integer> touchTable = zk.getTouchSnapshot();
         for (Entry<Long, Integer> entry : touchTable.entrySet()) {
             dos.writeLong(entry.getKey());
             dos.writeInt(entry.getValue());

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java Wed Oct  9 20:21:55 2013
@@ -525,7 +525,7 @@ public class LearnerHandler extends Thre
                     ByteArrayOutputStream bos = new ByteArrayOutputStream();
                     DataOutputStream dos = new DataOutputStream(bos);
                     dos.writeLong(id);
-                    boolean valid = leader.zk.touch(id, to);
+                    boolean valid = leader.zk.checkIfValidGlobalSession(id, to);
                     if (valid) {
                         try {
                             //set the session owner
@@ -559,7 +559,7 @@ public class LearnerHandler extends Thre
                         si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                     }
                     si.setOwner(this);
-                    leader.zk.submitRequest(si);
+                    leader.zk.submitLearnerRequest(si);
                     break;
                 default:
                 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java Wed Oct  9 20:21:55 2013
@@ -15,82 +15,202 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.zookeeper.server.quorum;
 
 import java.io.PrintWriter;
-import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.zookeeper.server.SessionTracker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.KeeperException.SessionMovedException;
+import org.apache.zookeeper.KeeperException.UnknownSessionException;
 import org.apache.zookeeper.server.SessionTrackerImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This is really just a shell of a SessionTracker that tracks session activity
- * to be forwarded to the Leader using a PING.
+ * The learner session tracker is used by learners (followers and observers) to
+ * track zookeeper sessions which may or may not be echoed to the leader.  When
+ * a new session is created it is saved locally in a wrapped
+ * LocalSessionTracker.  It can subsequently be upgraded to a global session
+ * as required.  If an upgrade is requested the session is removed from local
+ * collections while keeping the same session ID.  It is up to the caller to
+ * queue a session creation request for the leader.
+ * A secondary function of the learner session tracker is to remember sessions
+ * which have been touched in this service.  This information is passed along
+ * to the leader with a ping.
  */
-public class LearnerSessionTracker implements SessionTracker {
-    SessionExpirer expirer;
+public class LearnerSessionTracker extends UpgradeableSessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(LearnerSessionTracker.class);
+
+    private final SessionExpirer expirer;
+    // Touch table for the global sessions
+    private final AtomicReference<Map<Long, Integer>> touchTable =
+        new AtomicReference<Map<Long, Integer>>();
+    private final long serverId;
+    private final AtomicLong nextSessionId = new AtomicLong();
 
-    HashMap<Long, Integer> touchTable = new HashMap<Long, Integer>();
-    long serverId = 1;
-    long nextSessionId=0;
-    
-    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
+    private final boolean localSessionsEnabled;
+    private final ConcurrentMap<Long, Integer> globalSessionsWithTimeouts;
 
     public LearnerSessionTracker(SessionExpirer expirer,
-            ConcurrentHashMap<Long, Integer> sessionsWithTimeouts, long id) {
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id, boolean localSessionsEnabled) {
         this.expirer = expirer;
-        this.sessionsWithTimeouts = sessionsWithTimeouts;
+        this.touchTable.set(new ConcurrentHashMap<Long, Integer>());
+        this.globalSessionsWithTimeouts = sessionsWithTimeouts;
         this.serverId = id;
-        nextSessionId = SessionTrackerImpl.initializeNextSession(this.serverId);
-        
-    }
+        nextSessionId.set(SessionTrackerImpl.initializeNextSession(serverId));
 
-    synchronized public void removeSession(long sessionId) {
-        sessionsWithTimeouts.remove(sessionId);
-        touchTable.remove(sessionId);
+        this.localSessionsEnabled = localSessionsEnabled;
+        if (this.localSessionsEnabled) {
+            createLocalSessionTracker(expirer, tickTime, id);
+        }
+    }
+
+    public void removeSession(long sessionId) {
+        if (localSessionTracker != null) {
+            localSessionTracker.removeSession(sessionId);
+        }
+        globalSessionsWithTimeouts.remove(sessionId);
+        touchTable.get().remove(sessionId);
+    }
+
+    public void start() {
+        if (localSessionTracker != null) {
+            localSessionTracker.start();
+        }
     }
 
     public void shutdown() {
-    }
-
-    synchronized public void addSession(long sessionId, int sessionTimeout) {
-        sessionsWithTimeouts.put(sessionId, sessionTimeout);
-        touchTable.put(sessionId, sessionTimeout);
-    }
-
-    synchronized public boolean touchSession(long sessionId, int sessionTimeout) {
-        touchTable.put(sessionId, sessionTimeout);
+        if (localSessionTracker != null) {
+            localSessionTracker.shutdown();
+        }
+    }
+
+    public boolean isGlobalSession(long sessionId) {
+        return globalSessionsWithTimeouts.containsKey(sessionId);
+    }
+
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        boolean added =
+            globalSessionsWithTimeouts.put(sessionId, sessionTimeout) == null;
+        if (localSessionsEnabled && added) {
+            // Only do extra logging so we know what kind of session this is
+            // if we're supporting both kinds of sessions
+            LOG.info("Adding global session 0x" + Long.toHexString(sessionId));
+        }
+        touchTable.get().put(sessionId, sessionTimeout);
+        return added;
+    }
+
+    public boolean addSession(long sessionId, int sessionTimeout) {
+        boolean added;
+        if (localSessionsEnabled && !isGlobalSession(sessionId)) {
+            added = localSessionTracker.addSession(sessionId, sessionTimeout);
+            // Check for race condition with session upgrading
+            if (isGlobalSession(sessionId)) {
+                added = false;
+                localSessionTracker.removeSession(sessionId);
+            } else if (added) {
+                LOG.info("Adding local session 0x"
+                         + Long.toHexString(sessionId));
+            }
+        } else {
+            added = addGlobalSession(sessionId, sessionTimeout);
+        }
+        return added;
+    }
+
+    public boolean touchSession(long sessionId, int sessionTimeout) {
+        if (localSessionsEnabled) {
+            if (localSessionTracker.touchSession(sessionId, sessionTimeout)) {
+                return true;
+            }
+            if (!isGlobalSession(sessionId)) {
+                return false;
+            }
+        }
+        touchTable.get().put(sessionId, sessionTimeout);
         return true;
     }
 
-    synchronized HashMap<Long, Integer> snapshot() {
-        HashMap<Long, Integer> oldTouchTable = touchTable;
-        touchTable = new HashMap<Long, Integer>();
-        return oldTouchTable;
-    }
-
-
-    synchronized public long createSession(int sessionTimeout) {
-        return (nextSessionId++);
+    public Map<Long, Integer> snapshot() {
+        return touchTable.getAndSet(new ConcurrentHashMap<Long, Integer>());
     }
 
-    public void checkSession(long sessionId, Object owner)  {
-        // Nothing to do here. Sessions are checked at the Leader
-    }
-    
-    public void setOwner(long sessionId, Object owner) {
-        // Nothing to do here. Sessions are checked at the Leader
+    public long createSession(int sessionTimeout) {
+        if (localSessionsEnabled) {
+            return localSessionTracker.createSession(sessionTimeout);
+        }
+        return nextSessionId.getAndIncrement();
+    }
+
+    public void checkSession(long sessionId, Object owner)
+            throws SessionExpiredException, SessionMovedException  {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.checkSession(sessionId, owner);
+                return;
+            } catch (UnknownSessionException e) {
+                // Check whether it's a global session. We can ignore those
+                // because they are handled at the leader, but if not, rethrow.
+                // We check local session status first to avoid race condition
+                // with session upgrading.
+                if (!isGlobalSession(sessionId)) {
+                    throw new SessionExpiredException();
+                }
+            }
+        }
+    }
+
+    public void setOwner(long sessionId, Object owner)
+            throws SessionExpiredException {
+        if (localSessionTracker != null) {
+            try {
+                localSessionTracker.setOwner(sessionId, owner);
+                return;
+            } catch (SessionExpiredException e) {
+                // Check whether it's a global session. We can ignore those
+                // because they are handled at the leader, but if not, rethrow.
+                // We check local session status first to avoid race condition
+                // with session upgrading.
+                if (!isGlobalSession(sessionId)) {
+                    throw e;
+                }
+            }
+        }
     }
 
     public void dumpSessions(PrintWriter pwriter) {
-    	// the original class didn't have tostring impl, so just
-    	// dup what we had before
-    	pwriter.println(toString());
+        if (localSessionTracker != null) {
+            pwriter.print("Local ");
+            localSessionTracker.dumpSessions(pwriter);
+        }
+        pwriter.print("Global Sessions(");
+        pwriter.print(globalSessionsWithTimeouts.size());
+        pwriter.println("):");
+        SortedSet<Long> sessionIds = new TreeSet<Long>(
+                globalSessionsWithTimeouts.keySet());
+        for (long sessionId : sessionIds) {
+            pwriter.print("0x");
+            pwriter.print(Long.toHexString(sessionId));
+            pwriter.print("\t");
+            pwriter.print(globalSessionsWithTimeouts.get(sessionId));
+            pwriter.println("ms");
+        }
     }
 
     public void setSessionClosing(long sessionId) {
-        // Nothing to do here.
+        // Global sessions handled on the leader; this call is a no-op if
+        // not tracked as a local session so safe to call in both cases.
+        if (localSessionTracker != null) {
+            localSessionTracker.setSessionClosing(sessionId);
+        }
     }
 }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java Wed Oct  9 20:21:55 2013
@@ -18,19 +18,23 @@
 package org.apache.zookeeper.server.quorum;
 
 import java.io.IOException;
-import java.util.HashMap;
+import java.util.Collections;
+import java.util.Map;
 
 import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
 import org.apache.zookeeper.server.DataTreeBean;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServerBean;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 
 /**
- * Parent class for all ZooKeeperServers for Learners 
+ * Parent class for all ZooKeeperServers for Learners
  */
-public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {    
+public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
+
     public LearnerZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
             int minSessionTimeout, int maxSessionTimeout,
             ZKDatabase zkDb, QuorumPeer self)
@@ -42,47 +46,50 @@ public abstract class LearnerZooKeeperSe
     /**
      * Abstract method to return the learner associated with this server.
      * Since the Learner may change under our feet (when QuorumPeer reassigns
-     * it) we can't simply take a reference here. Instead, we need the 
-     * subclasses to implement this.     
+     * it) we can't simply take a reference here. Instead, we need the
+     * subclasses to implement this.
      */
-    abstract public Learner getLearner();        
-    
+    abstract public Learner getLearner();
+
     /**
      * Returns the current state of the session tracker. This is only currently
      * used by a Learner to build a ping response packet.
-     * 
+     *
      */
-    protected HashMap<Long, Integer> getTouchSnapshot() {
+    protected Map<Long, Integer> getTouchSnapshot() {
         if (sessionTracker != null) {
             return ((LearnerSessionTracker) sessionTracker).snapshot();
         }
-        return new HashMap<Long, Integer>();
+        Map<Long, Integer> map = Collections.emptyMap();
+        return map;
     }
-    
+
     /**
      * Returns the id of the associated QuorumPeer, which will do for a unique
-     * id of this server. 
+     * id of this server.
      */
     @Override
     public long getServerId() {
         return self.getId();
-    }    
-    
+    }
+
     @Override
     public void createSessionTracker() {
-        sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
-                self.getId());
+        sessionTracker = new LearnerSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                this.tickTime, self.getId(), self.areLocalSessionsEnabled());
     }
-    
-    @Override
-    protected void startSessionTracker() {}
-    
+
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
             int sessionTimeout) throws IOException {
-        getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+        if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+            super.revalidateSession(cnxn, sessionId, sessionTimeout);
+        } else {
+            getLearner().validateSession(cnxn, sessionId, sessionTimeout);
+        }
     }
-    
+
     @Override
     protected void registerJMX() {
         // register with JMX

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.server.SessionTrackerImpl;
+
+/**
+ * Local session tracker.
+ */
+public class LocalSessionTracker extends SessionTrackerImpl {
+    public LocalSessionTracker(SessionExpirer expirer,
+            ConcurrentMap<Long, Integer> sessionsWithTimeouts,
+            int tickTime, long id) {
+        super(expirer, sessionsWithTimeouts, tickTime, id);
+    }
+
+    public boolean isLocalSession(long sessionId) {
+        return isTrackingSession(sessionId);
+    }
+
+    public boolean isGlobalSession(long sessionId) {
+        return false;
+    }
+
+    public boolean addGlobalSession(long sessionId, int sessionTimeout) {
+        throw new UnsupportedOperationException();
+    }
+}

Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/LocalSessionTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -18,15 +18,18 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.txn.ErrorTxn;
 
 /**
  * This RequestProcessor forwards any requests that modify the state of the
@@ -91,12 +94,17 @@ public class ObserverRequestProcessor ex
                 case OpCode.setData:
                 case OpCode.reconfig:
                 case OpCode.setACL:
-                case OpCode.createSession:
-                case OpCode.closeSession:
                 case OpCode.multi:
                 case OpCode.check:
                     zks.getObserver().request(request);
                     break;
+                case OpCode.createSession:
+                case OpCode.closeSession:
+                    // Don't forward local sessions to the leader.
+                    if (!request.isLocalSession()) {
+                        zks.getObserver().request(request);
+                    }
+                    break;
                 }
             }
         } catch (Exception e) {
@@ -110,6 +118,22 @@ public class ObserverRequestProcessor ex
      */
     public void processRequest(Request request) {
         if (!finished) {
+            Request upgradeRequest = null;
+            try {
+                upgradeRequest = zks.checkUpgradeSession(request);
+            } catch (KeeperException ke) {
+                if (request.getHdr() != null) {
+                    request.getHdr().setType(OpCode.error);
+                    request.setTxn(new ErrorTxn(ke.code().intValue()));
+                }
+                request.setException(ke);
+                LOG.info("Error creating upgrade request",  ke);
+            } catch (IOException ie) {
+                LOG.error("Unexpected error in upgrade", ie);
+            }
+            if (upgradeRequest != null) {
+                queuedRequests.add(upgradeRequest);
+            }
             queuedRequests.add(request);
         }
     }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java Wed Oct  9 20:21:55 2013
@@ -68,10 +68,10 @@ public class ProposalRequestProcessor im
          * call processRequest on the next processor.
          */
 
-        if(request instanceof LearnerSyncRequest){
+        if (request instanceof LearnerSyncRequest){
             zks.getLeader().processSync((LearnerSyncRequest)request);
         } else {
-                nextProcessor.processRequest(request);
+            nextProcessor.processRequest(request);
             if (request.getHdr() != null) {
                 // We need to sync and get consensus on any transactions
                 try {

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Wed Oct  9 20:21:55 2013
@@ -381,6 +381,18 @@ public class QuorumPeer extends Thread i
     protected int tickTime;
 
     /**
+     * Whether learners in this quorum should create new sessions as local.
+     * False by default to preserve existing behavior.
+     */
+    protected boolean localSessionsEnabled = false;
+
+    /**
+     * Whether learners in this quorum should upgrade local sessions to
+     * global. Only matters if local sessions are enabled.
+     */
+    protected boolean localSessionsUpgradingEnabled = true;
+
+    /**
      * Minimum number of milliseconds to allow for session timeout.
      * A value of -1 indicates unset, use default.
      */
@@ -1142,6 +1154,28 @@ public class QuorumPeer extends Thread i
         return fac.getMaxClientCnxnsPerHost();
     }
 
+    /** Whether local sessions are enabled */
+    public boolean areLocalSessionsEnabled() {
+        return localSessionsEnabled;
+    }
+
+    /** Whether to enable local sessions */
+    public void enableLocalSessions(boolean flag) {
+        LOG.info("Local sessions " + (flag ? "enabled" : "disabled"));
+        localSessionsEnabled = flag;
+    }
+
+    /** Whether local sessions are allowed to upgrade to global sessions */
+    public boolean isLocalSessionsUpgradingEnabled() {
+        return localSessionsUpgradingEnabled;
+    }
+
+    /** Whether to allow local sessions to upgrade to global sessions */
+    public void enableLocalSessionsUpgrading(boolean flag) {
+        LOG.info("Local session upgrading " + (flag ? "enabled" : "disabled"));
+        localSessionsUpgradingEnabled = flag;
+    }
+
     /** minimum session timeout in milliseconds */
     public int getMinSessionTimeout() {
         return minSessionTimeout == -1 ? tickTime * 2 : minSessionTimeout;
@@ -1158,7 +1192,7 @@ public class QuorumPeer extends Thread i
         return maxSessionTimeout == -1 ? tickTime * 20 : maxSessionTimeout;
     }
 
-    /** minimum session timeout in milliseconds */
+    /** maximum session timeout in milliseconds */
     public void setMaxSessionTimeout(int max) {
         LOG.info("maxSessionTimeout set to " + max);
         this.maxSessionTimeout = max;

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java Wed Oct  9 20:21:55 2013
@@ -62,6 +62,8 @@ public class QuorumPeerConfig {
     protected int minSessionTimeout = -1;
     /** defaults to -1 if not set explicitly */
     protected int maxSessionTimeout = -1;
+    protected boolean localSessionsEnabled = false;
+    protected boolean localSessionsUpgradingEnabled = false;
 
     protected int initLimit;
     protected int syncLimit;
@@ -196,6 +198,10 @@ public class QuorumPeerConfig {
                 dataLogDir = vff.create(value);
             } else if (key.equals("clientPort")) {
                 clientPort = Integer.parseInt(value);
+            } else if (key.equals("localSessionsEnabled")) {
+                localSessionsEnabled = Boolean.parseBoolean(value);
+            } else if (key.equals("localSessionsUpgradingEnabled")) {
+                localSessionsUpgradingEnabled = Boolean.parseBoolean(value);
             } else if (key.equals("clientPortAddress")) {
                 clientPortAddress = value.trim();
             } else if (key.equals("tickTime")) {
@@ -503,12 +509,16 @@ public class QuorumPeerConfig {
     public int getMaxClientCnxns() { return maxClientCnxns; }
     public int getMinSessionTimeout() { return minSessionTimeout; }
     public int getMaxSessionTimeout() { return maxSessionTimeout; }
+    public boolean areLocalSessionsEnabled() { return localSessionsEnabled; }
+    public boolean isLocalSessionsUpgradingEnabled() {
+        return localSessionsUpgradingEnabled;
+    }
 
     public int getInitLimit() { return initLimit; }
     public int getSyncLimit() { return syncLimit; }
     public int getElectionAlg() { return electionAlg; }
-    public int getElectionPort() { return electionPort; }    
-    
+    public int getElectionPort() { return electionPort; }
+
     public int getSnapRetainCount() {
         return snapRetainCount;
     }
@@ -521,7 +531,7 @@ public class QuorumPeerConfig {
         return syncEnabled;
     }
 
-    public QuorumVerifier getQuorumVerifier() {   
+    public QuorumVerifier getQuorumVerifier() {
         return quorumVerifier;
     }