You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/09 22:21:56 UTC

svn commit: r1530781 [2/2] - in /zookeeper/trunk: ./ src/c/include/ src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/ src/java/main/org/apache/zookeeper/server/ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/o...

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Oct  9 20:21:55 2013
@@ -110,7 +110,7 @@ public class QuorumPeerMain {
                 .getDataDir(), config.getDataLogDir(), config
                 .getSnapRetainCount(), config.getPurgeInterval());
         purgeMgr.start();
-        
+
         if (args.length == 1 && config.isDistributed()) {
             runFromConfig(config);
         } else {
@@ -127,17 +127,20 @@ public class QuorumPeerMain {
       } catch (JMException e) {
           LOG.warn("Unable to register log4j JMX control", e);
       }
-  
+
       LOG.info("Starting quorum peer");
       try {
           ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
           cnxnFactory.configure(config.getClientPortAddress(),
                                 config.getMaxClientCnxns());
-  
-          quorumPeer = new QuorumPeer();          
+
+          quorumPeer = new QuorumPeer();
           quorumPeer.setTxnFactory(new FileTxnSnapLog(
                       config.getDataLogDir(),
                       config.getDataDir()));
+          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
+          quorumPeer.enableLocalSessionsUpgrading(
+              config.isLocalSessionsUpgradingEnabled());
           //quorumPeer.setQuorumPeers(config.getAllMembers());
           quorumPeer.setElectionType(config.getElectionAlg());
           quorumPeer.setMyid(config.getServerId());

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Wed Oct  9 20:21:55 2013
@@ -17,8 +17,16 @@
  */
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.io.PrintWriter;
+import java.nio.ByteBuffer;
 
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -28,7 +36,9 @@ import org.apache.zookeeper.server.persi
  * a quorum.
  */
 public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
+
     public final QuorumPeer self;
+    protected UpgradeableSessionTracker upgradeableSessionTracker;
 
     protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
             int minSessionTimeout, int maxSessionTimeout,
@@ -39,6 +49,95 @@ public abstract class QuorumZooKeeperSer
     }
 
     @Override
+    protected void startSessionTracker() {
+        upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
+        upgradeableSessionTracker.start();
+    }
+
+    public Request checkUpgradeSession(Request request)
+            throws IOException, KeeperException {
+        // If this is a request for a local session and it is to
+        // create an ephemeral node, then upgrade the session and return
+        // a new session request for the leader.
+        // This is called by the request processor thread (either follower
+        // or observer request processor), which is unique to a learner.
+        // So will not be called concurrently by two threads.
+        if (request.type != OpCode.create ||
+            !upgradeableSessionTracker.isLocalSession(request.sessionId)) {
+            return null;
+        }
+        CreateRequest createRequest = new CreateRequest();
+        request.request.rewind();
+        ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+        request.request.rewind();
+        CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+        if (!createMode.isEphemeral()) {
+            return null;
+        }
+        // Uh oh.  We need to upgrade before we can proceed.
+        if (!self.isLocalSessionsUpgradingEnabled()) {
+            throw new KeeperException.EphemeralOnLocalSessionException();
+        }
+
+        return makeUpgradeRequest(request.sessionId);
+    }
+
+    private Request makeUpgradeRequest(long sessionId) {
+        // Make sure to atomically check local session status, upgrade
+        // session, and make the session creation request.  This is to
+        // avoid another thread upgrading the session in parallel.
+        synchronized (upgradeableSessionTracker) {
+            if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+                int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
+                ByteBuffer to = ByteBuffer.allocate(4);
+                to.putInt(timeout);
+                return new Request(
+                        null, sessionId, 0, OpCode.createSession, to, null);
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Implements the SessionUpgrader interface,
+     *
+     * @param sessionId
+     */
+    public void upgrade(long sessionId) {
+        Request request = makeUpgradeRequest(sessionId);
+        if (request != null) {
+            LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+            // This must be a global request
+            submitRequest(request);
+        }
+    }
+
+    @Override
+    protected void setLocalSessionFlag(Request si) {
+        // We need to set isLocalSession to tree for these type of request
+        // so that the request processor can process them correctly.
+        switch (si.type) {
+        case OpCode.createSession:
+            if (self.areLocalSessionsEnabled()) {
+                // All new sessions local by default.
+                si.setLocalSession(true);
+            }
+            break;
+        case OpCode.closeSession:
+            String reqType = "global";
+            if (upgradeableSessionTracker.isLocalSession(si.sessionId)) {
+                si.setLocalSession(true);
+                reqType = "local";
+            }
+            LOG.info("Submitting " + reqType + " closeSession request"
+                    + " for session 0x" + Long.toHexString(si.sessionId));
+            break;
+        default:
+            break;
+        }
+    }
+
+    @Override
     public void dumpConf(PrintWriter pwriter) {
         super.dumpConf(pwriter);
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java Wed Oct  9 20:21:55 2013
@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.PrintWriter;
+
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -36,11 +38,16 @@ import org.apache.zookeeper.server.persi
  * The very first processor in the chain of request processors is a
  * ReadOnlyRequestProcessor which drops state-changing requests.
  */
-public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
+public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
 
+    protected final QuorumPeer self;
     private volatile boolean shutdown = false;
-    ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
-        super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
+
+    ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+                            ZKDatabase zkDb) {
+        super(logFactory, self.tickTime, self.minSessionTimeout,
+              self.maxSessionTimeout, zkDb);
+        this.self = self;
     }
 
     @Override
@@ -141,4 +148,21 @@ public class ReadOnlyZooKeeperServer ext
         super.shutdown();
     }
 
+    @Override
+    public void dumpConf(PrintWriter pwriter) {
+        super.dumpConf(pwriter);
+
+        pwriter.print("initLimit=");
+        pwriter.println(self.getInitLimit());
+        pwriter.print("syncLimit=");
+        pwriter.println(self.getSyncLimit());
+        pwriter.print("electionAlg=");
+        pwriter.println(self.getElectionType());
+        pwriter.print("electionPort=");
+        pwriter.println(self.getElectionAddress().getPort());
+        pwriter.print("quorumPort=");
+        pwriter.println(self.getQuorumAddress().getPort());
+        pwriter.print("peerType=");
+        pwriter.println(self.getLearnerType().ordinal());
+    }
 }

Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.SessionTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A session tracker that supports upgradeable local sessions.
+ */
+public abstract class UpgradeableSessionTracker implements SessionTracker {
+    private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class);
+
+    private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
+    protected LocalSessionTracker localSessionTracker;
+
+    public void start() {}
+
+    public void createLocalSessionTracker(SessionExpirer expirer,
+            int tickTime, long id) {
+        this.localSessionsWithTimeouts =
+            new ConcurrentHashMap<Long, Integer>();
+        this.localSessionTracker = new LocalSessionTracker(
+            expirer, this.localSessionsWithTimeouts, tickTime, id);
+    }
+
+    public boolean isTrackingSession(long sessionId) {
+        return isLocalSession(sessionId) || isGlobalSession(sessionId);
+    }
+
+    public boolean isLocalSession(long sessionId) {
+        return localSessionTracker != null &&
+            localSessionTracker.isTrackingSession(sessionId);
+    }
+
+    abstract public boolean isGlobalSession(long sessionId);
+
+    /**
+     * Upgrades the session to a global session.
+     * This simply removes the session from the local tracker and marks
+     * it as global.  It is up to the caller to actually
+     * queue up a transaction for the session.
+     *
+     * @param sessionId
+     * @return session timeout (-1 if not a local session)
+     */
+    public int upgradeSession(long sessionId) {
+        if (localSessionsWithTimeouts == null) {
+            return -1;
+        }
+        // We won't race another upgrade attempt because only one thread
+        // will get the timeout from the map
+        Integer timeout = localSessionsWithTimeouts.remove(sessionId);
+        if (timeout != null) {
+            LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+            // Add as global before removing as local
+            addGlobalSession(sessionId, timeout);
+            localSessionTracker.removeSession(sessionId);
+            return timeout;
+        }
+        return -1;
+    }
+
+    public void checkGlobalSession(long sessionId, Object owner)
+            throws KeeperException.SessionExpiredException,
+            KeeperException.SessionMovedException {
+        throw new UnsupportedOperationException();
+    }
+}

Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java Wed Oct  9 20:21:55 2013
@@ -76,15 +76,19 @@ public class PrepRequestProcessorTest ex
     
     private class MySessionTracker implements SessionTracker {
         @Override
-        public void addSession(long id, int to) {
+        public boolean addGlobalSession(long id, int to) {
             // TODO Auto-generated method stub
-            
+            return false;
+        }
+        @Override
+        public boolean addSession(long id, int to) {
+            // TODO Auto-generated method stub
+            return false;
         }
         @Override
         public void checkSession(long sessionId, Object owner)
                 throws SessionExpiredException, SessionMovedException {
             // TODO Auto-generated method stub
-            
         }
         @Override
         public long createSession(int sessionTimeout) {
@@ -94,23 +98,27 @@ public class PrepRequestProcessorTest ex
         @Override
         public void dumpSessions(PrintWriter pwriter) {
             // TODO Auto-generated method stub
-            
+
         }
          @Override
         public void removeSession(long sessionId) {
             // TODO Auto-generated method stub
-            
+
+        }
+        public int upgradeSession(long sessionId) {
+             // TODO Auto-generated method stub
+             return 0;
         }
         @Override
         public void setOwner(long id, Object owner)
                 throws SessionExpiredException {
             // TODO Auto-generated method stub
-            
+
         }
         @Override
         public void shutdown() {
             // TODO Auto-generated method stub
-            
+
         }
         @Override
         public boolean touchSession(long sessionId, int sessionTimeout) {
@@ -121,5 +129,15 @@ public class PrepRequestProcessorTest ex
         public void setSessionClosing(long sessionId) {
           // TODO Auto-generated method stub
         }
+        @Override
+        public boolean isTrackingSession(long sessionId) {
+            // TODO Auto-generated method stub
+            return false;
+        }
+        @Override
+        public void checkGlobalSession(long sessionId, Object owner)
+                throws SessionExpiredException, SessionMovedException {
+            // TODO Auto-generated method stub
+        }
     }
 }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Wed Oct  9 20:21:55 2013
@@ -113,7 +113,6 @@ public class QuorumPeerMainTest extends 
     @Test
     public void testEarlyLeaderAbandonment() throws Exception {
         ClientBase.setupTestEnv();
-
         final int SERVER_COUNT = 3;
         final int clientPorts[] = new int[SERVER_COUNT];
         StringBuilder sb = new StringBuilder();
@@ -143,10 +142,12 @@ public class QuorumPeerMainTest extends 
 
         for (int i = 0; i < SERVER_COUNT; i++) {
             mt[i].start();
-        }
+            // Recreate a client session since the previous session was not persisted.
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+         }
+
+        waitForAll(zk, States.CONNECTED);
 
-        waitForAll(zk, States.CONNECTED);          
-                          
 
         // ok lets find the leader and kill everything else, we have a few
         // seconds, so it should be plenty of time
@@ -182,6 +183,8 @@ public class QuorumPeerMainTest extends 
         }
         for (int i = 0; i < SERVER_COUNT; i++) {
             if (i != leader) {
+                // Recreate a client session since the previous session was not persisted.
+                zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
                 waitForOne(zk[i], States.CONNECTED);
                 zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
             }
@@ -306,24 +309,29 @@ public class QuorumPeerMainTest extends 
     }
 
     private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
         while (zk.getState() != state) {
+            if (iterations-- == 0) {
+                throw new RuntimeException("Waiting too long");
+            }
             Thread.sleep(500);
         }
     }
 
     private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
-        int iterations = 10;
+        int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
         boolean someoneNotConnected = true;
-        while (someoneNotConnected) {           
+        while (someoneNotConnected) {
             if (iterations-- == 0) {
                 ClientBase.logAllStackTraces();
                 throw new RuntimeException("Waiting too long");
             }
 
             someoneNotConnected = false;
-            for (ZooKeeper zk : zks) {                
+            for (ZooKeeper zk : zks) {
                 if (zk.getState() != state) {
                     someoneNotConnected = true;
+                    break;
                 }
             }
             Thread.sleep(1000);

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When request are route incorrectly, both follower and the leader will perform
+ * local session upgrade. So we saw CreateSession twice in txnlog This doesn't
+ * affect the correctness but cause the ensemble to see more load than
+ * necessary.
+ */
+public class DuplicateLocalSessionUpgradeTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(DuplicateLocalSessionUpgradeTest.class);
+
+    private final QuorumBase qb = new QuorumBase();
+
+    private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnFollower() throws Exception {
+        testLocalSessionUpgrade(false);
+    }
+
+    @Test
+    public void testLocalSessionUpgradeOnLeader() throws Exception {
+        testLocalSessionUpgrade(true);
+    }
+
+    private void testLocalSessionUpgrade(boolean testLeader) throws Exception {
+
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        final String firstPath = "/first";
+        final String secondPath = "/ephemeral";
+
+        // Just create some node so that we know the current zxid
+        zk.create(firstPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        // Now, try an ephemeral node. This will trigger session upgrade
+        // so there will be createSession request inject into the pipeline
+        // prior to this request
+        zk.create(secondPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL);
+
+        Stat firstStat = zk.exists(firstPath, null);
+        Assert.assertNotNull(firstStat);
+
+        Stat secondStat = zk.exists(secondPath, null);
+        Assert.assertNotNull(secondStat);
+
+        long zxidDiff = secondStat.getCzxid() - firstStat.getCzxid();
+
+        // If there is only one createSession request in between, zxid diff
+        // will be exactly 2. The alternative way of checking is to actually
+        // read txnlog but this should be sufficient
+        Assert.assertEquals(2L, zxidDiff);
+
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to race condition or bad client code, the leader may get request from
+ * expired session. We need to make sure that we never allow ephmeral node
+ * to be created in those case, but we do allow normal node to be created.
+ */
+public class LeaderSessionTrackerTest extends ZKTestCase implements Watcher {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LeaderSessionTrackerTest.class);
+
+    QuorumUtil qu;
+
+    @Before
+    public void setUp() throws Exception {
+        qu = new QuorumUtil(1);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        qu.shutdownAll();
+    }
+
+    @Test
+    public void testExpiredSessionWithLocalSession() throws Exception {
+        testCreateEphemeral(true);
+    }
+
+    @Test
+    public void testExpiredSessionWithoutLocalSession() throws Exception {
+        testCreateEphemeral(false);
+    }
+
+    /**
+     * When we create ephemeral node, we need to check against global
+     * session, so the leader never accept request from an expired session
+     * (that we no longer track)
+     *
+     * This is not the same as SessionInvalidationTest since session
+     * is not in closing state
+     */
+    public void testCreateEphemeral(boolean localSessionEnabled) throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        if (localSessionEnabled) {
+            qu.enableLocalSession(true);
+        }
+        qu.startAll();
+
+        QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+        ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+                CONNECTION_TIMEOUT, this);
+
+        CreateRequest createRequest = new CreateRequest("/impossible",
+                new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+        // Mimic sessionId generated by follower's local session tracker
+        long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+                .getServerId();
+        long fakeSessionId = (sid << 56) + 1;
+
+        LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId));
+
+        Request request = new Request(null, fakeSessionId, 0, OpCode.create,
+                bb, new ArrayList<Id>());
+
+        // Submit request directly to leader
+        leader.getActiveServer().submitRequest(request);
+
+        // Make sure that previous request is finished
+        zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Stat stat = zk.exists("/impossible", null);
+        Assert.assertEquals("Node from fake session get created", null, stat);
+
+    }
+
+    /**
+     * When local session is enabled, leader will allow persistent node
+     * to be create for unknown session
+     */
+    @Test
+    public void testCreatePersistent() throws Exception {
+        QuorumUtil qu = new QuorumUtil(1);
+        qu.enableLocalSession(true);
+        qu.startAll();
+
+        QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+        ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+                CONNECTION_TIMEOUT, this);
+
+        CreateRequest createRequest = new CreateRequest("/success",
+                new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag());
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+        // Mimic sessionId generated by follower's local session tracker
+        long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+                .getServerId();
+        long locallSession = (sid << 56) + 1;
+
+        LOG.info("Local session Id: " + Long.toHexString(locallSession));
+
+        Request request = new Request(null, locallSession, 0, OpCode.create,
+                bb, new ArrayList<Id>());
+
+        // Submit request directly to leader
+        leader.getActiveServer().submitRequest(request);
+
+        // Make sure that previous request is finished
+        zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+
+        Stat stat = zk.exists("/success", null);
+        Assert.assertTrue("Request from local sesson failed", stat != null);
+
+    }
+
+    @Override
+    public void process(WatchedEvent event) {
+    }
+
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.TraceFormatter;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate that open/close session request of a local session to not propagate
+ * to other machines in the quorum. We verify this by checking that
+ * these request doesn't show up in committedLog on other machines.
+ */
+public class LocalSessionRequestTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LocalSessionRequestTest.class);
+    // Need to be short since we need to wait for session to expire
+    public static final int CONNECTION_TIMEOUT = 4000;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsOnFollower() throws Exception {
+        testOpenCloseSession(false);
+    }
+
+    @Test
+    public void testLocalSessionsOnLeader() throws Exception {
+        testOpenCloseSession(true);
+    }
+
+    /**
+     * Walk through the target peer commmittedLog.
+     * @param sessionId
+     * @param peerId
+     */
+    private void validateRequestLog(long sessionId, int peerId) {
+        String session = Long.toHexString(sessionId);
+        LOG.info("Searching for txn of session 0x " + session +
+                " on peer " + peerId);
+        String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower";
+        QuorumPeer peer = qb.getPeerList().get(peerId);
+        ZKDatabase db = peer.getActiveServer().getZKDatabase();
+        for (Proposal p : db.getCommittedLog()) {
+            Assert.assertFalse("Should not see " +
+                               TraceFormatter.op2String(p.request.type) +
+                               " request from local session 0x" + session +
+                               " on the " + peerType,
+                               p.request.sessionId == sessionId);
+        }
+    }
+
+    /**
+     * Test that a CloseSession request generated by both the server (client
+     * disconnect) or by the client (client explicitly issue close()) doesn't
+     * get committed by the ensemble
+     */
+    public void testOpenCloseSession(boolean onLeader) throws Exception {
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = onLeader ? leaderIdx : followerIdx;
+        int verifyPeerIdx = onLeader ? followerIdx : leaderIdx;
+
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper client = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId1 = client.getSessionId();
+
+        // Cut the connection, so the server will create closeSession as part
+        // of expiring the session.
+        client.dontReconnect();
+        client.disconnect();
+        watcher.reset();
+
+        // We don't validate right away, will do another session create first
+
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId2 = zk.getSessionId();
+
+        // Send closeSession request.
+        zk.close();
+        watcher.reset();
+
+        // This should be enough time for the first session to expire and for
+        // the closeSession request to propagate to other machines (if there is a bug)
+        // Since it is time sensitive, we have false negative when test
+        // machine is under load
+        Thread.sleep(CONNECTION_TIMEOUT * 2);
+
+        // Validate that we don't see any txn from the first session
+        validateRequestLog(localSessionId1, verifyPeerIdx);
+
+        // Validate that we don't see any txn from the second session
+        validateRequestLog(localSessionId2, verifyPeerIdx);
+
+        qb.shutdownServers();
+
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests learners configured to use local sessions only. Expected
+ * behavior is that sessions created on the learner will never be
+ * made global.  Operations requiring a global session (e.g.
+ * creation of ephemeral nodes) will fail with an error.
+ */
+public class LocalSessionsOnlyTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(LocalSessionsOnlyTest.class);
+    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = false;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsOnFollower() throws Exception {
+        testLocalSessions(false);
+    }
+
+    @Test
+    public void testLocalSessionsOnLeader() throws Exception {
+        testLocalSessions(true);
+    }
+
+    private void testLocalSessions(boolean testLeader) throws Exception {
+        String nodePrefix = "/testLocalSessions-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+                                       CONNECTION_TIMEOUT);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        long localSessionId = zk.getSessionId();
+
+        // Try creating some data.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        // Now, try an ephemeral node.  This should fail since we
+        // cannot create ephemeral nodes on a local session.
+        try {
+            zk.create(nodePrefix + "ephemeral", new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+            Assert.fail("Ephemeral node creation should fail.");
+        } catch (KeeperException.EphemeralOnLocalSessionException e) {
+        }
+
+        // Close the session.
+        zk.close();
+
+        // Validate data on both follower and leader
+        HashMap<String, Integer> peers = new HashMap<String, Integer>();
+        peers.put("leader", leaderIdx);
+        peers.put("follower", followerIdx);
+        for (Entry<String, Integer> entry: peers.entrySet()) {
+            watcher.reset();
+            // Try reconnecting with a new session.
+            // The data should be persisted, even though the session was not.
+            zk = qb.createClient(watcher, hostPorts[entry.getValue()],
+                                 CONNECTION_TIMEOUT);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+            long newSessionId = zk.getSessionId();
+            Assert.assertFalse(newSessionId == localSessionId);
+
+            for (int i = 0; i < 5; i++) {
+                Assert.assertNotNull("Data not exists in " + entry.getKey(),
+                        zk.exists(nodePrefix + i, null));
+            }
+
+            // We may get the correct exception but the txn may go through
+            Assert.assertNull("Data exists in " + entry.getKey(),
+                    zk.exists(nodePrefix + "ephemeral", null));
+
+            zk.close();
+        }
+        qb.shutdownServers();
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Wed Oct  9 20:21:55 2013
@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Set;
@@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.junit.Assert;
 import org.junit.Test;
@@ -60,7 +62,10 @@ public class QuorumBase extends ClientBa
     protected int portClient3;
     protected int portClient4;
     protected int portClient5;
-    
+
+    protected boolean localSessionsEnabled = false;
+    protected boolean localSessionsUpgradingEnabled = false;
+
     @Test
     // This just avoids complaints by junit
     public void testNull() {
@@ -188,6 +193,17 @@ public class QuorumBase extends ClientBa
         LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView());
         LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());       
         
+        s1.enableLocalSessions(localSessionsEnabled);
+        s2.enableLocalSessions(localSessionsEnabled);
+        s3.enableLocalSessions(localSessionsEnabled);
+        s4.enableLocalSessions(localSessionsEnabled);
+        s5.enableLocalSessions(localSessionsEnabled);
+        s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+        s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+
         LOG.info("start QuorumPeer 1");
         s1.start();
         LOG.info("start QuorumPeer 2");
@@ -230,9 +246,33 @@ public class QuorumBase extends ClientBa
         }
         JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
     }
-    
-    
-    public void setupServers() throws IOException {        
+
+    public int getLeaderIndex() {
+      if (s1.getPeerState() == ServerState.LEADING) {
+        return 0;
+      } else if (s2.getPeerState() == ServerState.LEADING) {
+        return 1;
+      } else if (s3.getPeerState() == ServerState.LEADING) {
+        return 2;
+      } else if (s4.getPeerState() == ServerState.LEADING) {
+        return 3;
+      } else if (s5.getPeerState() == ServerState.LEADING) {
+        return 4;
+      }
+      return -1;
+    }
+
+    public ArrayList<QuorumPeer> getPeerList() {
+        ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
+        peers.add(s1);
+        peers.add(s2);
+        peers.add(s3);
+        peers.add(s4);
+        peers.add(s5);
+        return peers;
+    }
+
+    public void setupServers() throws IOException {
         setupServer(1);
         setupServer(2);
         setupServer(3);
@@ -303,7 +343,7 @@ public class QuorumBase extends ClientBa
             Assert.assertEquals(portClient5, s5.getClientPort());
         }
     }
-    
+
     @Override
     public void tearDown() throws Exception {
         LOG.info("TearDown started");
@@ -334,6 +374,9 @@ public class QuorumBase extends ClientBa
     }
 
     public static void shutdown(QuorumPeer qp) {
+        if (qp == null) {
+            return;
+        }
         try {
             LOG.info("Shutting down quorum peer " + qp.getName());
             qp.shutdown();

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Oct  9 20:21:55 2013
@@ -304,22 +304,20 @@ public class QuorumTest extends ZKTestCa
         while(qu.getPeer(index).peer.leader == null)
             index++;
 
-        ZooKeeper zk = new ZooKeeper(
-                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
-                ClientBase.CONNECTION_TIMEOUT, watcher);
-        watcher.waitForConnected(CONNECTION_TIMEOUT);
-
         // break the quorum
         qu.shutdown(index);
-
-        // Wait until we disconnect to proceed
-        watcher.waitForDisconnected(CONNECTION_TIMEOUT);
         
         // try to reestablish the quorum
         qu.start(index);
+        
+        // Connect the client after services are restarted (otherwise we would get
+        // SessionExpiredException as the previous local session was not persisted).
+        ZooKeeper zk = new ZooKeeper(
+                "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+                ClientBase.CONNECTION_TIMEOUT, watcher);
 
         try{
-            watcher.waitForConnected(30000);      
+            watcher.waitForConnected(CONNECTION_TIMEOUT);      
         } catch(TimeoutException e) {
             Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
         }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Wed Oct  9 20:21:55 2013
@@ -21,13 +21,14 @@ package org.apache.zookeeper.test;
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -35,6 +36,8 @@ import org.apache.zookeeper.server.quoru
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.util.OSMXBean;
 import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
@@ -73,6 +76,8 @@ public class QuorumUtil {
 
     private int electionAlg;
 
+    private boolean localSessionEnabled;
+
     /**
      * Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
      *
@@ -129,6 +134,11 @@ public class QuorumUtil {
     // This was added to avoid running into the problem of ZOOKEEPER-1539
     public boolean disableJMXTest = false;
     
+
+    public void enableLocalSession(boolean localSessionEnabled) {
+        this.localSessionEnabled = localSessionEnabled;
+    }
+
     public void startAll() throws IOException {
         shutdownAll();
         for (int i = 1; i <= ALL; ++i) {
@@ -191,6 +201,9 @@ public class QuorumUtil {
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
                 ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
         Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
 
         ps.peer.start();
@@ -207,6 +220,9 @@ public class QuorumUtil {
         LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
         ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
                 ps.id, tickTime, initLimit, syncLimit);
+        if (localSessionEnabled) {
+            ps.peer.enableLocalSessions(true);
+        }
         Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
 
         ps.peer.start();
@@ -252,6 +268,31 @@ public class QuorumUtil {
         return hostPort;
     }
 
+    public String getConnectString(QuorumPeer peer) {
+        return "127.0.0.1:" + peer.getClientPort();
+    }
+
+    public QuorumPeer getLeaderQuorumPeer() {
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader != null) {
+               return ps.peer;
+            }
+        }
+        throw new RuntimeException("Unable to find a leader peer");
+    }
+
+    public List<QuorumPeer> getFollowerQuorumPeers() {
+        List<QuorumPeer> peerList = new ArrayList<QuorumPeer>(ALL - 1); 
+
+        for (PeerStruct ps: peers.values()) {
+            if (ps.peer.leader == null) {
+               peerList.add(ps.peer);      
+            }
+        }
+
+        return Collections.unmodifiableList(peerList);
+    }
+
     public void tearDown() throws Exception {
         LOG.info("TearDown started");
 

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Wed Oct  9 20:21:55 2013
@@ -123,6 +123,12 @@ public class ReadOnlyModeTest extends ZK
 
         watcher.reset();
         qu.shutdown(2);
+        zk.close();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
 
         // read operation during r/o mode
@@ -140,6 +146,13 @@ public class ReadOnlyModeTest extends ZK
         qu.start(2);
         Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
                 "127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+        zk.close();
+        watcher.reset();
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
         zk.setData(node, "We're in the quorum now".getBytes(), -1);
 
@@ -175,6 +188,15 @@ public class ReadOnlyModeTest extends ZK
         // kill peer and wait no more than 5 seconds for read-only server
         // to be started (which should take one tickTime (2 seconds))
         qu.shutdown(2);
+
+        // Re-connect the client (in case we were connected to the shut down
+        // server and the local session was not persisted).
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+                new Watcher() {
+                    public void process(WatchedEvent event) {
+                        states.add(event.getState());
+                    }
+                }, true);
         long start = System.currentTimeMillis();
         while (!(zk.getState() == States.CONNECTEDREADONLY)) {
             Thread.sleep(200);
@@ -228,7 +250,6 @@ public class ReadOnlyModeTest extends ZK
     @SuppressWarnings("deprecation")
     @Test(timeout = 90000)
     public void testSeekForRwServer() throws Exception {
-
         // setup the logger to capture all logs
         Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
                 .getLayout();

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.SessionTracker.Session;
+import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.quorum.LeaderSessionTracker;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate various type of sessions against leader session tracker and learner
+ * session tracker
+ */
+public class SessionTrackerCheckTest extends ZKTestCase {
+
+    protected static final Logger LOG = LoggerFactory
+            .getLogger(SessionTrackerCheckTest.class);
+    public static final int TICK_TIME = 1000;
+    public static final int CONNECTION_TIMEOUT = TICK_TIME * 10;
+
+    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts =
+            new ConcurrentHashMap<Long, Integer>();
+
+    private class Expirer implements SessionExpirer {
+        long sid;
+
+        public Expirer(long sid) {
+            this.sid = sid;
+        }
+
+        public void expire(Session session) {
+        }
+
+        public long getServerId() {
+            return sid;
+        }
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        sessionsWithTimeouts.clear();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+    }
+
+    @Test
+    public void testLearnerSessionTracker() throws Exception {
+        Expirer expirer = new Expirer(1);
+        // With local session on
+        LearnerSessionTracker tracker = new LearnerSessionTracker(expirer,
+                sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+        // Unknown session
+        long sessionId = 0xb100ded;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("Unknown session should have failed");
+        } catch (SessionExpiredException e) {
+            // Get expected exception
+        }
+
+        // Global session
+        sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+
+        // Local session
+        sessionId = 0xf005ba11;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Local session should not fail");
+        }
+
+        // During session upgrade
+        sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Session during upgrade should not fail");
+        }
+
+        // With local session off
+        tracker = new LearnerSessionTracker(expirer, sessionsWithTimeouts,
+                TICK_TIME, expirer.sid, false);
+
+        // Should be noop
+        sessionId = 0xdeadbeef;
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Should not get any exception");
+        }
+
+    }
+
+    @Test
+    public void testLeaderSessionTracker() throws Exception {
+        Expirer expirer = new Expirer(2);
+        // With local session on
+        LeaderSessionTracker tracker = new LeaderSessionTracker(expirer,
+                sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+        // Local session from other server
+        long sessionId = ((expirer.sid + 1) << 56) + 1;
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("local session from other server should not fail");
+        }
+
+        // Global session
+        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail " + e);
+        }
+
+        // Local session from the leader
+        sessionId = (expirer.sid << 56) + 1;
+        ;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Local session on the leader should not fail");
+        }
+
+        // During session upgrade
+        tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Session during upgrade should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail " + e);
+        }
+
+        // With local session off
+        tracker = new LeaderSessionTracker(expirer, sessionsWithTimeouts,
+                TICK_TIME, expirer.sid, false);
+
+        // Global session
+        sessionId = 0xdeadbeef;
+        tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+        try {
+            tracker.checkSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+        try {
+            tracker.checkGlobalSession(sessionId, null);
+        } catch (Exception e) {
+            Assert.fail("Global session should not fail");
+        }
+
+        // Local session from other server
+        sessionId = ((expirer.sid + 1) << 56) + 2;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("local session from other server should fail");
+        } catch (SessionExpiredException e) {
+            // Got expected exception
+        }
+
+        // Local session from the leader
+        sessionId = ((expirer.sid) << 56) + 2;
+        try {
+            tracker.checkSession(sessionId, null);
+            Assert.fail("local session from the leader should fail");
+        } catch (SessionExpiredException e) {
+            // Got expected exception
+        }
+
+    }
+
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java Wed Oct  9 20:21:55 2013
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that session upgrade works from local to global sessions.
+ * Expected behavior is that if global-only sessions are unset,
+ * and no upgrade interval is specified, then sessions will be
+ * created locally to the host.  They will be upgraded to global
+ * sessions iff an operation is done on that session which requires
+ * persistence, i.e. creating an ephemeral node.
+ */
+public class SessionUpgradeTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeTest.class);
+    public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+    private final QuorumBase qb = new QuorumBase();
+
+    @Before
+    public void setUp() throws Exception {
+        LOG.info("STARTING quorum " + getClass().getName());
+        qb.localSessionsEnabled = true;
+        qb.localSessionsUpgradingEnabled = true;
+        qb.setUp();
+        ClientBase.waitForServerUp(qb.hostPort, 10000);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        LOG.info("STOPPING quorum " + getClass().getName());
+        qb.tearDown();
+    }
+
+    @Test
+    public void testLocalSessionsWithoutEphemeralOnFollower() throws Exception {
+        testLocalSessionsWithoutEphemeral(false);
+    }
+
+    @Test
+    public void testLocalSessionsWithoutEphemeralOnLeader() throws Exception {
+        testLocalSessionsWithoutEphemeral(true);
+    }
+
+    private void testLocalSessionsWithoutEphemeral(boolean testLeader)
+            throws Exception {
+        String nodePrefix = "/testLocalSessions-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int otherFollowerIdx = (leaderIdx + 2) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // Try creating some data.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        }
+
+        long localSessionId = zk.getSessionId();
+        byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+        // Try connecting with the same session id on a different
+        // server.  This should fail since it is a local sesion.
+        try {
+            watcher.reset();
+            DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+                    hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+
+            zknew.create(nodePrefix + "5", new byte[0],
+                         ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Assert.fail("Connection on the same session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        } catch (KeeperException.ConnectionLossException e) {
+        }
+
+        // If we're testing a follower, also check the session id on the
+        // leader. This should also fail
+        if (!testLeader) {
+            try {
+                watcher.reset();
+                DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+                        hostPorts[leaderIdx], CONNECTION_TIMEOUT,
+                        watcher, localSessionId, localSessionPwd);
+
+                zknew.create(nodePrefix + "5", new byte[0],
+                             ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                             CreateMode.PERSISTENT);
+                Assert.fail("Connection on the same session ID should fail.");
+            } catch (KeeperException.SessionExpiredException e) {
+            } catch (KeeperException.ConnectionLossException e) {
+            }
+        }
+
+        // However, we should be able to disconnect and reconnect to the same
+        // server with the same session id (as long as we do it quickly
+        // before expiration).
+        zk.disconnect();
+
+        watcher.reset();
+        zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+                localSessionId, localSessionPwd);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        zk.create(nodePrefix + "6", new byte[0],
+                  ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        // If we explicitly close the session, then the session id should no
+        // longer be valid.
+        zk.close();
+        try {
+            watcher.reset();
+            zk = new DisconnectableZooKeeper(
+                    hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+
+            zk.create(nodePrefix + "7", new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            Assert.fail("Reconnecting to a closed session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        }
+    }
+
+    @Test
+    public void testUpgradeWithEphemeralOnFollower() throws Exception {
+        testUpgradeWithEphemeral(false);
+    }
+
+    @Test
+    public void testUpgradeWithEphemeralOnLeader() throws Exception {
+        testUpgradeWithEphemeral(true);
+    }
+
+    private void testUpgradeWithEphemeral(boolean testLeader)
+            throws Exception {
+        String nodePrefix = "/testUpgrade-"
+            + (testLeader ? "leaderTest-" : "followerTest-");
+        int leaderIdx = qb.getLeaderIndex();
+        Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+        int followerIdx = (leaderIdx + 1) % 5;
+        int otherFollowerIdx = (leaderIdx + 2) % 5;
+        int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+        String hostPorts[] = qb.hostPort.split(",");
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // Create some ephemeral nodes.  This should force the session to
+        // be propagated to the other servers in the ensemble.
+        for (int i = 0; i < 5; i++) {
+            zk.create(nodePrefix + i, new byte[0],
+                      ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+        }
+
+        // We should be able to reconnect with the same session id on a
+        // different server, since it has been propagated.
+        long localSessionId = zk.getSessionId();
+        byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+        zk.disconnect();
+        watcher.reset();
+        zk = new DisconnectableZooKeeper(
+                hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                localSessionId, localSessionPwd);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+        // The created ephemeral nodes are still around.
+        for (int i = 0; i < 5; i++) {
+            Assert.assertNotNull(zk.exists(nodePrefix + i, null));
+        }
+
+        // When we explicitly close the session, we should not be able to
+        // reconnect with the same session id
+        zk.close();
+
+        try {
+            watcher.reset();
+            zk = new DisconnectableZooKeeper(
+                    hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+                    localSessionId, localSessionPwd);
+            zk.exists(nodePrefix + "0", null);
+            Assert.fail("Reconnecting to a closed session ID should fail.");
+        } catch (KeeperException.SessionExpiredException e) {
+        }
+
+        watcher.reset();
+        // And the ephemeral nodes will be gone since the session died.
+        zk = new DisconnectableZooKeeper(
+                hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        for (int i = 0; i < 5; i++) {
+            Assert.assertNull(zk.exists(nodePrefix + i, null));
+        }
+    }
+}

Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java
------------------------------------------------------------------------------
    svn:eol-style = native