You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by th...@apache.org on 2013/10/09 22:21:56 UTC
svn commit: r1530781 [2/2] - in /zookeeper/trunk: ./ src/c/include/
src/java/main/org/apache/zookeeper/ src/java/main/org/apache/zookeeper/cli/
src/java/main/org/apache/zookeeper/server/
src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/o...
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeerMain.java Wed Oct 9 20:21:55 2013
@@ -110,7 +110,7 @@ public class QuorumPeerMain {
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
-
+
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
@@ -127,17 +127,20 @@ public class QuorumPeerMain {
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
-
+
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns());
-
- quorumPeer = new QuorumPeer();
+
+ quorumPeer = new QuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
+ quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
+ quorumPeer.enableLocalSessionsUpgrading(
+ config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java Wed Oct 9 20:21:55 2013
@@ -17,8 +17,16 @@
*/
package org.apache.zookeeper.server.quorum;
+import java.io.IOException;
import java.io.PrintWriter;
+import java.nio.ByteBuffer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.ByteBufferInputStream;
+import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
@@ -28,7 +36,9 @@ import org.apache.zookeeper.server.persi
* a quorum.
*/
public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
+
public final QuorumPeer self;
+ protected UpgradeableSessionTracker upgradeableSessionTracker;
protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime,
int minSessionTimeout, int maxSessionTimeout,
@@ -39,6 +49,95 @@ public abstract class QuorumZooKeeperSer
}
@Override
+ protected void startSessionTracker() {
+ upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
+ upgradeableSessionTracker.start();
+ }
+
+ public Request checkUpgradeSession(Request request)
+ throws IOException, KeeperException {
+ // If this is a request for a local session and it is to
+ // create an ephemeral node, then upgrade the session and return
+ // a new session request for the leader.
+ // This is called by the request processor thread (either follower
+ // or observer request processor), which is unique to a learner.
+ // So will not be called concurrently by two threads.
+ if (request.type != OpCode.create ||
+ !upgradeableSessionTracker.isLocalSession(request.sessionId)) {
+ return null;
+ }
+ CreateRequest createRequest = new CreateRequest();
+ request.request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+ request.request.rewind();
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+ if (!createMode.isEphemeral()) {
+ return null;
+ }
+ // Uh oh. We need to upgrade before we can proceed.
+ if (!self.isLocalSessionsUpgradingEnabled()) {
+ throw new KeeperException.EphemeralOnLocalSessionException();
+ }
+
+ return makeUpgradeRequest(request.sessionId);
+ }
+
+ private Request makeUpgradeRequest(long sessionId) {
+ // Make sure to atomically check local session status, upgrade
+ // session, and make the session creation request. This is to
+ // avoid another thread upgrading the session in parallel.
+ synchronized (upgradeableSessionTracker) {
+ if (upgradeableSessionTracker.isLocalSession(sessionId)) {
+ int timeout = upgradeableSessionTracker.upgradeSession(sessionId);
+ ByteBuffer to = ByteBuffer.allocate(4);
+ to.putInt(timeout);
+ return new Request(
+ null, sessionId, 0, OpCode.createSession, to, null);
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Implements the SessionUpgrader interface,
+ *
+ * @param sessionId
+ */
+ public void upgrade(long sessionId) {
+ Request request = makeUpgradeRequest(sessionId);
+ if (request != null) {
+ LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+ // This must be a global request
+ submitRequest(request);
+ }
+ }
+
+ @Override
+ protected void setLocalSessionFlag(Request si) {
+ // We need to set isLocalSession to tree for these type of request
+ // so that the request processor can process them correctly.
+ switch (si.type) {
+ case OpCode.createSession:
+ if (self.areLocalSessionsEnabled()) {
+ // All new sessions local by default.
+ si.setLocalSession(true);
+ }
+ break;
+ case OpCode.closeSession:
+ String reqType = "global";
+ if (upgradeableSessionTracker.isLocalSession(si.sessionId)) {
+ si.setLocalSession(true);
+ reqType = "local";
+ }
+ LOG.info("Submitting " + reqType + " closeSession request"
+ + " for session 0x" + Long.toHexString(si.sessionId));
+ break;
+ default:
+ break;
+ }
+ }
+
+ @Override
public void dumpConf(PrintWriter pwriter) {
super.dumpConf(pwriter);
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java Wed Oct 9 20:21:55 2013
@@ -18,6 +18,8 @@
package org.apache.zookeeper.server.quorum;
+import java.io.PrintWriter;
+
import org.apache.zookeeper.jmx.MBeanRegistry;
import org.apache.zookeeper.server.DataTreeBean;
import org.apache.zookeeper.server.FinalRequestProcessor;
@@ -36,11 +38,16 @@ import org.apache.zookeeper.server.persi
* The very first processor in the chain of request processors is a
* ReadOnlyRequestProcessor which drops state-changing requests.
*/
-public class ReadOnlyZooKeeperServer extends QuorumZooKeeperServer {
+public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
+ protected final QuorumPeer self;
private volatile boolean shutdown = false;
- ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self, ZKDatabase zkDb) {
- super(logFactory, self.tickTime, self.minSessionTimeout, self.maxSessionTimeout, zkDb, self);
+
+ ReadOnlyZooKeeperServer(FileTxnSnapLog logFactory, QuorumPeer self,
+ ZKDatabase zkDb) {
+ super(logFactory, self.tickTime, self.minSessionTimeout,
+ self.maxSessionTimeout, zkDb);
+ this.self = self;
}
@Override
@@ -141,4 +148,21 @@ public class ReadOnlyZooKeeperServer ext
super.shutdown();
}
+ @Override
+ public void dumpConf(PrintWriter pwriter) {
+ super.dumpConf(pwriter);
+
+ pwriter.print("initLimit=");
+ pwriter.println(self.getInitLimit());
+ pwriter.print("syncLimit=");
+ pwriter.println(self.getSyncLimit());
+ pwriter.print("electionAlg=");
+ pwriter.println(self.getElectionType());
+ pwriter.print("electionPort=");
+ pwriter.println(self.getElectionAddress().getPort());
+ pwriter.print("quorumPort=");
+ pwriter.println(self.getQuorumAddress().getPort());
+ pwriter.print("peerType=");
+ pwriter.println(self.getLearnerType().ordinal());
+ }
}
Added: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java (added)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.server.SessionTracker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A session tracker that supports upgradeable local sessions.
+ */
+public abstract class UpgradeableSessionTracker implements SessionTracker {
+ private static final Logger LOG = LoggerFactory.getLogger(UpgradeableSessionTracker.class);
+
+ private ConcurrentMap<Long, Integer> localSessionsWithTimeouts;
+ protected LocalSessionTracker localSessionTracker;
+
+ public void start() {}
+
+ public void createLocalSessionTracker(SessionExpirer expirer,
+ int tickTime, long id) {
+ this.localSessionsWithTimeouts =
+ new ConcurrentHashMap<Long, Integer>();
+ this.localSessionTracker = new LocalSessionTracker(
+ expirer, this.localSessionsWithTimeouts, tickTime, id);
+ }
+
+ public boolean isTrackingSession(long sessionId) {
+ return isLocalSession(sessionId) || isGlobalSession(sessionId);
+ }
+
+ public boolean isLocalSession(long sessionId) {
+ return localSessionTracker != null &&
+ localSessionTracker.isTrackingSession(sessionId);
+ }
+
+ abstract public boolean isGlobalSession(long sessionId);
+
+ /**
+ * Upgrades the session to a global session.
+ * This simply removes the session from the local tracker and marks
+ * it as global. It is up to the caller to actually
+ * queue up a transaction for the session.
+ *
+ * @param sessionId
+ * @return session timeout (-1 if not a local session)
+ */
+ public int upgradeSession(long sessionId) {
+ if (localSessionsWithTimeouts == null) {
+ return -1;
+ }
+ // We won't race another upgrade attempt because only one thread
+ // will get the timeout from the map
+ Integer timeout = localSessionsWithTimeouts.remove(sessionId);
+ if (timeout != null) {
+ LOG.info("Upgrading session 0x" + Long.toHexString(sessionId));
+ // Add as global before removing as local
+ addGlobalSession(sessionId, timeout);
+ localSessionTracker.removeSession(sessionId);
+ return timeout;
+ }
+ return -1;
+ }
+
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws KeeperException.SessionExpiredException,
+ KeeperException.SessionMovedException {
+ throw new UnsupportedOperationException();
+ }
+}
Propchange: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/PrepRequestProcessorTest.java Wed Oct 9 20:21:55 2013
@@ -76,15 +76,19 @@ public class PrepRequestProcessorTest ex
private class MySessionTracker implements SessionTracker {
@Override
- public void addSession(long id, int to) {
+ public boolean addGlobalSession(long id, int to) {
// TODO Auto-generated method stub
-
+ return false;
+ }
+ @Override
+ public boolean addSession(long id, int to) {
+ // TODO Auto-generated method stub
+ return false;
}
@Override
public void checkSession(long sessionId, Object owner)
throws SessionExpiredException, SessionMovedException {
// TODO Auto-generated method stub
-
}
@Override
public long createSession(int sessionTimeout) {
@@ -94,23 +98,27 @@ public class PrepRequestProcessorTest ex
@Override
public void dumpSessions(PrintWriter pwriter) {
// TODO Auto-generated method stub
-
+
}
@Override
public void removeSession(long sessionId) {
// TODO Auto-generated method stub
-
+
+ }
+ public int upgradeSession(long sessionId) {
+ // TODO Auto-generated method stub
+ return 0;
}
@Override
public void setOwner(long id, Object owner)
throws SessionExpiredException {
// TODO Auto-generated method stub
-
+
}
@Override
public void shutdown() {
// TODO Auto-generated method stub
-
+
}
@Override
public boolean touchSession(long sessionId, int sessionTimeout) {
@@ -121,5 +129,15 @@ public class PrepRequestProcessorTest ex
public void setSessionClosing(long sessionId) {
// TODO Auto-generated method stub
}
+ @Override
+ public boolean isTrackingSession(long sessionId) {
+ // TODO Auto-generated method stub
+ return false;
+ }
+ @Override
+ public void checkGlobalSession(long sessionId, Object owner)
+ throws SessionExpiredException, SessionMovedException {
+ // TODO Auto-generated method stub
+ }
}
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java Wed Oct 9 20:21:55 2013
@@ -113,7 +113,6 @@ public class QuorumPeerMainTest extends
@Test
public void testEarlyLeaderAbandonment() throws Exception {
ClientBase.setupTestEnv();
-
final int SERVER_COUNT = 3;
final int clientPorts[] = new int[SERVER_COUNT];
StringBuilder sb = new StringBuilder();
@@ -143,10 +142,12 @@ public class QuorumPeerMainTest extends
for (int i = 0; i < SERVER_COUNT; i++) {
mt[i].start();
- }
+ // Recreate a client session since the previous session was not persisted.
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
+ }
+
+ waitForAll(zk, States.CONNECTED);
- waitForAll(zk, States.CONNECTED);
-
// ok lets find the leader and kill everything else, we have a few
// seconds, so it should be plenty of time
@@ -182,6 +183,8 @@ public class QuorumPeerMainTest extends
}
for (int i = 0; i < SERVER_COUNT; i++) {
if (i != leader) {
+ // Recreate a client session since the previous session was not persisted.
+ zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i], ClientBase.CONNECTION_TIMEOUT, this);
waitForOne(zk[i], States.CONNECTED);
zk[i].create("/zk" + i, "zk".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
@@ -306,24 +309,29 @@ public class QuorumPeerMainTest extends
}
private void waitForOne(ZooKeeper zk, States state) throws InterruptedException {
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 500;
while (zk.getState() != state) {
+ if (iterations-- == 0) {
+ throw new RuntimeException("Waiting too long");
+ }
Thread.sleep(500);
}
}
private void waitForAll(ZooKeeper[] zks, States state) throws InterruptedException {
- int iterations = 10;
+ int iterations = ClientBase.CONNECTION_TIMEOUT / 1000;
boolean someoneNotConnected = true;
- while (someoneNotConnected) {
+ while (someoneNotConnected) {
if (iterations-- == 0) {
ClientBase.logAllStackTraces();
throw new RuntimeException("Waiting too long");
}
someoneNotConnected = false;
- for (ZooKeeper zk : zks) {
+ for (ZooKeeper zk : zks) {
if (zk.getState() != state) {
someoneNotConnected = true;
+ break;
}
}
Thread.sleep(1000);
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * When request are route incorrectly, both follower and the leader will perform
+ * local session upgrade. So we saw CreateSession twice in txnlog This doesn't
+ * affect the correctness but cause the ensemble to see more load than
+ * necessary.
+ */
+public class DuplicateLocalSessionUpgradeTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(DuplicateLocalSessionUpgradeTest.class);
+
+ private final QuorumBase qb = new QuorumBase();
+
+ private static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("STARTING quorum " + getClass().getName());
+ qb.localSessionsEnabled = true;
+ qb.localSessionsUpgradingEnabled = true;
+ qb.setUp();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("STOPPING quorum " + getClass().getName());
+ qb.tearDown();
+ }
+
+ @Test
+ public void testLocalSessionUpgradeOnFollower() throws Exception {
+ testLocalSessionUpgrade(false);
+ }
+
+ @Test
+ public void testLocalSessionUpgradeOnLeader() throws Exception {
+ testLocalSessionUpgrade(true);
+ }
+
+ private void testLocalSessionUpgrade(boolean testLeader) throws Exception {
+
+ int leaderIdx = qb.getLeaderIndex();
+ Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+ int followerIdx = (leaderIdx + 1) % 5;
+ int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+ String hostPorts[] = qb.hostPort.split(",");
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+ CONNECTION_TIMEOUT);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ final String firstPath = "/first";
+ final String secondPath = "/ephemeral";
+
+ // Just create some node so that we know the current zxid
+ zk.create(firstPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ // Now, try an ephemeral node. This will trigger session upgrade
+ // so there will be createSession request inject into the pipeline
+ // prior to this request
+ zk.create(secondPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL);
+
+ Stat firstStat = zk.exists(firstPath, null);
+ Assert.assertNotNull(firstStat);
+
+ Stat secondStat = zk.exists(secondPath, null);
+ Assert.assertNotNull(secondStat);
+
+ long zxidDiff = secondStat.getCzxid() - firstStat.getCzxid();
+
+ // If there is only one createSession request in between, zxid diff
+ // will be exactly 2. The alternative way of checking is to actually
+ // read txnlog but this should be sufficient
+ Assert.assertEquals(2L, zxidDiff);
+
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/DuplicateLocalSessionUpgradeTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Due to race condition or bad client code, the leader may get request from
+ * expired session. We need to make sure that we never allow ephmeral node
+ * to be created in those case, but we do allow normal node to be created.
+ */
+public class LeaderSessionTrackerTest extends ZKTestCase implements Watcher {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(LeaderSessionTrackerTest.class);
+
+ QuorumUtil qu;
+
+ @Before
+ public void setUp() throws Exception {
+ qu = new QuorumUtil(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ qu.shutdownAll();
+ }
+
+ @Test
+ public void testExpiredSessionWithLocalSession() throws Exception {
+ testCreateEphemeral(true);
+ }
+
+ @Test
+ public void testExpiredSessionWithoutLocalSession() throws Exception {
+ testCreateEphemeral(false);
+ }
+
+ /**
+ * When we create ephemeral node, we need to check against global
+ * session, so the leader never accept request from an expired session
+ * (that we no longer track)
+ *
+ * This is not the same as SessionInvalidationTest since session
+ * is not in closing state
+ */
+ public void testCreateEphemeral(boolean localSessionEnabled) throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ if (localSessionEnabled) {
+ qu.enableLocalSession(true);
+ }
+ qu.startAll();
+
+ QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+ ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+ CONNECTION_TIMEOUT, this);
+
+ CreateRequest createRequest = new CreateRequest("/impossible",
+ new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ createRequest.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+ // Mimic sessionId generated by follower's local session tracker
+ long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+ .getServerId();
+ long fakeSessionId = (sid << 56) + 1;
+
+ LOG.info("Fake session Id: " + Long.toHexString(fakeSessionId));
+
+ Request request = new Request(null, fakeSessionId, 0, OpCode.create,
+ bb, new ArrayList<Id>());
+
+ // Submit request directly to leader
+ leader.getActiveServer().submitRequest(request);
+
+ // Make sure that previous request is finished
+ zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat stat = zk.exists("/impossible", null);
+ Assert.assertEquals("Node from fake session get created", null, stat);
+
+ }
+
+ /**
+ * When local session is enabled, leader will allow persistent node
+ * to be create for unknown session
+ */
+ @Test
+ public void testCreatePersistent() throws Exception {
+ QuorumUtil qu = new QuorumUtil(1);
+ qu.enableLocalSession(true);
+ qu.startAll();
+
+ QuorumPeer leader = qu.getLeaderQuorumPeer();
+
+ ZooKeeper zk = new ZooKeeper(qu.getConnectString(leader),
+ CONNECTION_TIMEOUT, this);
+
+ CreateRequest createRequest = new CreateRequest("/success",
+ new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT.toFlag());
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+ createRequest.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
+
+ // Mimic sessionId generated by follower's local session tracker
+ long sid = qu.getFollowerQuorumPeers().get(0).getActiveServer()
+ .getServerId();
+ long locallSession = (sid << 56) + 1;
+
+ LOG.info("Local session Id: " + Long.toHexString(locallSession));
+
+ Request request = new Request(null, locallSession, 0, OpCode.create,
+ bb, new ArrayList<Id>());
+
+ // Submit request directly to leader
+ leader.getActiveServer().submitRequest(request);
+
+ // Make sure that previous request is finished
+ zk.create("/ok", new byte[0], Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+
+ Stat stat = zk.exists("/success", null);
+ Assert.assertTrue("Request from local sesson failed", stat != null);
+
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ }
+
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LeaderSessionTrackerTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.TraceFormatter;
+import org.apache.zookeeper.server.ZKDatabase;
+import org.apache.zookeeper.server.quorum.Leader.Proposal;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate that open/close session request of a local session to not propagate
+ * to other machines in the quorum. We verify this by checking that
+ * these request doesn't show up in committedLog on other machines.
+ */
+public class LocalSessionRequestTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(LocalSessionRequestTest.class);
+ // Need to be short since we need to wait for session to expire
+ public static final int CONNECTION_TIMEOUT = 4000;
+
+ private final QuorumBase qb = new QuorumBase();
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("STARTING quorum " + getClass().getName());
+ qb.localSessionsEnabled = true;
+ qb.localSessionsUpgradingEnabled = true;
+ qb.setUp();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("STOPPING quorum " + getClass().getName());
+ qb.tearDown();
+ }
+
+ @Test
+ public void testLocalSessionsOnFollower() throws Exception {
+ testOpenCloseSession(false);
+ }
+
+ @Test
+ public void testLocalSessionsOnLeader() throws Exception {
+ testOpenCloseSession(true);
+ }
+
+ /**
+ * Walk through the target peer commmittedLog.
+ * @param sessionId
+ * @param peerId
+ */
+ private void validateRequestLog(long sessionId, int peerId) {
+ String session = Long.toHexString(sessionId);
+ LOG.info("Searching for txn of session 0x " + session +
+ " on peer " + peerId);
+ String peerType = peerId == qb.getLeaderIndex() ? "leader" : "follower";
+ QuorumPeer peer = qb.getPeerList().get(peerId);
+ ZKDatabase db = peer.getActiveServer().getZKDatabase();
+ for (Proposal p : db.getCommittedLog()) {
+ Assert.assertFalse("Should not see " +
+ TraceFormatter.op2String(p.request.type) +
+ " request from local session 0x" + session +
+ " on the " + peerType,
+ p.request.sessionId == sessionId);
+ }
+ }
+
+ /**
+ * Test that a CloseSession request generated by both the server (client
+ * disconnect) or by the client (client explicitly issue close()) doesn't
+ * get committed by the ensemble
+ */
+ public void testOpenCloseSession(boolean onLeader) throws Exception {
+ int leaderIdx = qb.getLeaderIndex();
+ Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+ int followerIdx = (leaderIdx + 1) % 5;
+ int testPeerIdx = onLeader ? leaderIdx : followerIdx;
+ int verifyPeerIdx = onLeader ? followerIdx : leaderIdx;
+
+ String hostPorts[] = qb.hostPort.split(",");
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ DisconnectableZooKeeper client = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ long localSessionId1 = client.getSessionId();
+
+ // Cut the connection, so the server will create closeSession as part
+ // of expiring the session.
+ client.dontReconnect();
+ client.disconnect();
+ watcher.reset();
+
+ // We don't validate right away, will do another session create first
+
+ ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+ CONNECTION_TIMEOUT);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ long localSessionId2 = zk.getSessionId();
+
+ // Send closeSession request.
+ zk.close();
+ watcher.reset();
+
+ // This should be enough time for the first session to expire and for
+ // the closeSession request to propagate to other machines (if there is a bug)
+ // Since it is time sensitive, we have false negative when test
+ // machine is under load
+ Thread.sleep(CONNECTION_TIMEOUT * 2);
+
+ // Validate that we don't see any txn from the first session
+ validateRequestLog(localSessionId1, verifyPeerIdx);
+
+ // Validate that we don't see any txn from the second session
+ validateRequestLog(localSessionId2, verifyPeerIdx);
+
+ qb.shutdownServers();
+
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionRequestTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests learners configured to use local sessions only. Expected
+ * behavior is that sessions created on the learner will never be
+ * made global. Operations requiring a global session (e.g.
+ * creation of ephemeral nodes) will fail with an error.
+ */
+public class LocalSessionsOnlyTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(LocalSessionsOnlyTest.class);
+ public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+ private final QuorumBase qb = new QuorumBase();
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("STARTING quorum " + getClass().getName());
+ qb.localSessionsEnabled = true;
+ qb.localSessionsUpgradingEnabled = false;
+ qb.setUp();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("STOPPING quorum " + getClass().getName());
+ qb.tearDown();
+ }
+
+ @Test
+ public void testLocalSessionsOnFollower() throws Exception {
+ testLocalSessions(false);
+ }
+
+ @Test
+ public void testLocalSessionsOnLeader() throws Exception {
+ testLocalSessions(true);
+ }
+
+ private void testLocalSessions(boolean testLeader) throws Exception {
+ String nodePrefix = "/testLocalSessions-"
+ + (testLeader ? "leaderTest-" : "followerTest-");
+ int leaderIdx = qb.getLeaderIndex();
+ Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+ int followerIdx = (leaderIdx + 1) % 5;
+ int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+ String hostPorts[] = qb.hostPort.split(",");
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ ZooKeeper zk = qb.createClient(watcher, hostPorts[testPeerIdx],
+ CONNECTION_TIMEOUT);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ long localSessionId = zk.getSessionId();
+
+ // Try creating some data.
+ for (int i = 0; i < 5; i++) {
+ zk.create(nodePrefix + i, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ // Now, try an ephemeral node. This should fail since we
+ // cannot create ephemeral nodes on a local session.
+ try {
+ zk.create(nodePrefix + "ephemeral", new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ Assert.fail("Ephemeral node creation should fail.");
+ } catch (KeeperException.EphemeralOnLocalSessionException e) {
+ }
+
+ // Close the session.
+ zk.close();
+
+ // Validate data on both follower and leader
+ HashMap<String, Integer> peers = new HashMap<String, Integer>();
+ peers.put("leader", leaderIdx);
+ peers.put("follower", followerIdx);
+ for (Entry<String, Integer> entry: peers.entrySet()) {
+ watcher.reset();
+ // Try reconnecting with a new session.
+ // The data should be persisted, even though the session was not.
+ zk = qb.createClient(watcher, hostPorts[entry.getValue()],
+ CONNECTION_TIMEOUT);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ long newSessionId = zk.getSessionId();
+ Assert.assertFalse(newSessionId == localSessionId);
+
+ for (int i = 0; i < 5; i++) {
+ Assert.assertNotNull("Data not exists in " + entry.getKey(),
+ zk.exists(nodePrefix + i, null));
+ }
+
+ // We may get the correct exception but the txn may go through
+ Assert.assertNull("Data exists in " + entry.getKey(),
+ zk.exists(nodePrefix + "ephemeral", null));
+
+ zk.close();
+ }
+ qb.shutdownServers();
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/LocalSessionsOnlyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumBase.java Wed Oct 9 20:21:55 2013
@@ -21,6 +21,7 @@ package org.apache.zookeeper.test;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Set;
@@ -33,6 +34,7 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.QuorumPeer;
import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
import org.apache.zookeeper.server.util.OSMXBean;
import org.junit.Assert;
import org.junit.Test;
@@ -60,7 +62,10 @@ public class QuorumBase extends ClientBa
protected int portClient3;
protected int portClient4;
protected int portClient5;
-
+
+ protected boolean localSessionsEnabled = false;
+ protected boolean localSessionsUpgradingEnabled = false;
+
@Test
// This just avoids complaints by junit
public void testNull() {
@@ -188,6 +193,17 @@ public class QuorumBase extends ClientBa
LOG.info("QuorumPeer 4 voting view: " + s4.getVotingView());
LOG.info("QuorumPeer 5 voting view: " + s5.getVotingView());
+ s1.enableLocalSessions(localSessionsEnabled);
+ s2.enableLocalSessions(localSessionsEnabled);
+ s3.enableLocalSessions(localSessionsEnabled);
+ s4.enableLocalSessions(localSessionsEnabled);
+ s5.enableLocalSessions(localSessionsEnabled);
+ s1.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+ s2.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+ s3.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+ s4.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+ s5.enableLocalSessionsUpgrading(localSessionsUpgradingEnabled);
+
LOG.info("start QuorumPeer 1");
s1.start();
LOG.info("start QuorumPeer 2");
@@ -230,9 +246,33 @@ public class QuorumBase extends ClientBa
}
JMXEnv.ensureAll(ensureNames.toArray(new String[ensureNames.size()]));
}
-
-
- public void setupServers() throws IOException {
+
+ public int getLeaderIndex() {
+ if (s1.getPeerState() == ServerState.LEADING) {
+ return 0;
+ } else if (s2.getPeerState() == ServerState.LEADING) {
+ return 1;
+ } else if (s3.getPeerState() == ServerState.LEADING) {
+ return 2;
+ } else if (s4.getPeerState() == ServerState.LEADING) {
+ return 3;
+ } else if (s5.getPeerState() == ServerState.LEADING) {
+ return 4;
+ }
+ return -1;
+ }
+
+ public ArrayList<QuorumPeer> getPeerList() {
+ ArrayList<QuorumPeer> peers = new ArrayList<QuorumPeer>();
+ peers.add(s1);
+ peers.add(s2);
+ peers.add(s3);
+ peers.add(s4);
+ peers.add(s5);
+ return peers;
+ }
+
+ public void setupServers() throws IOException {
setupServer(1);
setupServer(2);
setupServer(3);
@@ -303,7 +343,7 @@ public class QuorumBase extends ClientBa
Assert.assertEquals(portClient5, s5.getClientPort());
}
}
-
+
@Override
public void tearDown() throws Exception {
LOG.info("TearDown started");
@@ -334,6 +374,9 @@ public class QuorumBase extends ClientBa
}
public static void shutdown(QuorumPeer qp) {
+ if (qp == null) {
+ return;
+ }
try {
LOG.info("Shutting down quorum peer " + qp.getName());
qp.shutdown();
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumTest.java Wed Oct 9 20:21:55 2013
@@ -304,22 +304,20 @@ public class QuorumTest extends ZKTestCa
while(qu.getPeer(index).peer.leader == null)
index++;
- ZooKeeper zk = new ZooKeeper(
- "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
- ClientBase.CONNECTION_TIMEOUT, watcher);
- watcher.waitForConnected(CONNECTION_TIMEOUT);
-
// break the quorum
qu.shutdown(index);
-
- // Wait until we disconnect to proceed
- watcher.waitForDisconnected(CONNECTION_TIMEOUT);
// try to reestablish the quorum
qu.start(index);
+
+ // Connect the client after services are restarted (otherwise we would get
+ // SessionExpiredException as the previous local session was not persisted).
+ ZooKeeper zk = new ZooKeeper(
+ "127.0.0.1:" + qu.getPeer((index == 1)?2:1).peer.getClientPort(),
+ ClientBase.CONNECTION_TIMEOUT, watcher);
try{
- watcher.waitForConnected(30000);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
} catch(TimeoutException e) {
Assert.fail("client could not connect to reestablished quorum: giving up after 30+ seconds.");
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/QuorumUtil.java Wed Oct 9 20:21:55 2013
@@ -21,13 +21,14 @@ package org.apache.zookeeper.test;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.server.quorum.Election;
import org.apache.zookeeper.server.quorum.QuorumPeer;
@@ -35,6 +36,8 @@ import org.apache.zookeeper.server.quoru
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
import org.apache.zookeeper.server.util.OSMXBean;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Utility for quorum testing. Setups 2n+1 peers and allows to start/stop all
@@ -73,6 +76,8 @@ public class QuorumUtil {
private int electionAlg;
+ private boolean localSessionEnabled;
+
/**
* Initializes 2n+1 quorum peers which will form a ZooKeeper ensemble.
*
@@ -129,6 +134,11 @@ public class QuorumUtil {
// This was added to avoid running into the problem of ZOOKEEPER-1539
public boolean disableJMXTest = false;
+
+ public void enableLocalSession(boolean localSessionEnabled) {
+ this.localSessionEnabled = localSessionEnabled;
+ }
+
public void startAll() throws IOException {
shutdownAll();
for (int i = 1; i <= ALL; ++i) {
@@ -191,6 +201,9 @@ public class QuorumUtil {
LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
ps.id, tickTime, initLimit, syncLimit);
+ if (localSessionEnabled) {
+ ps.peer.enableLocalSessions(true);
+ }
Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
ps.peer.start();
@@ -207,6 +220,9 @@ public class QuorumUtil {
LOG.info("Creating QuorumPeer " + ps.id + "; public port " + ps.clientPort);
ps.peer = new QuorumPeer(peersView, ps.dataDir, ps.dataDir, ps.clientPort, electionAlg,
ps.id, tickTime, initLimit, syncLimit);
+ if (localSessionEnabled) {
+ ps.peer.enableLocalSessions(true);
+ }
Assert.assertEquals(ps.clientPort, ps.peer.getClientPort());
ps.peer.start();
@@ -252,6 +268,31 @@ public class QuorumUtil {
return hostPort;
}
+ public String getConnectString(QuorumPeer peer) {
+ return "127.0.0.1:" + peer.getClientPort();
+ }
+
+ public QuorumPeer getLeaderQuorumPeer() {
+ for (PeerStruct ps: peers.values()) {
+ if (ps.peer.leader != null) {
+ return ps.peer;
+ }
+ }
+ throw new RuntimeException("Unable to find a leader peer");
+ }
+
+ public List<QuorumPeer> getFollowerQuorumPeers() {
+ List<QuorumPeer> peerList = new ArrayList<QuorumPeer>(ALL - 1);
+
+ for (PeerStruct ps: peers.values()) {
+ if (ps.peer.leader == null) {
+ peerList.add(ps.peer);
+ }
+ }
+
+ return Collections.unmodifiableList(peerList);
+ }
+
public void tearDown() throws Exception {
LOG.info("TearDown started");
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java?rev=1530781&r1=1530780&r2=1530781&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ReadOnlyModeTest.java Wed Oct 9 20:21:55 2013
@@ -123,6 +123,12 @@ public class ReadOnlyModeTest extends ZK
watcher.reset();
qu.shutdown(2);
+ zk.close();
+
+ // Re-connect the client (in case we were connected to the shut down
+ // server and the local session was not persisted).
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT);
// read operation during r/o mode
@@ -140,6 +146,13 @@ public class ReadOnlyModeTest extends ZK
qu.start(2);
Assert.assertTrue("waiting for server up", ClientBase.waitForServerUp(
"127.0.0.1:" + qu.getPeer(2).clientPort, CONNECTION_TIMEOUT));
+ zk.close();
+ watcher.reset();
+
+ // Re-connect the client (in case we were connected to the shut down
+ // server and the local session was not persisted).
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ watcher, true);
watcher.waitForConnected(CONNECTION_TIMEOUT);
zk.setData(node, "We're in the quorum now".getBytes(), -1);
@@ -175,6 +188,15 @@ public class ReadOnlyModeTest extends ZK
// kill peer and wait no more than 5 seconds for read-only server
// to be started (which should take one tickTime (2 seconds))
qu.shutdown(2);
+
+ // Re-connect the client (in case we were connected to the shut down
+ // server and the local session was not persisted).
+ zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT,
+ new Watcher() {
+ public void process(WatchedEvent event) {
+ states.add(event.getState());
+ }
+ }, true);
long start = System.currentTimeMillis();
while (!(zk.getState() == States.CONNECTEDREADONLY)) {
Thread.sleep(200);
@@ -228,7 +250,6 @@ public class ReadOnlyModeTest extends ZK
@SuppressWarnings("deprecation")
@Test(timeout = 90000)
public void testSeekForRwServer() throws Exception {
-
// setup the logger to capture all logs
Layout layout = Logger.getRootLogger().getAppender("CONSOLE")
.getLayout();
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.SessionTracker.Session;
+import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
+import org.apache.zookeeper.server.quorum.LeaderSessionTracker;
+import org.apache.zookeeper.server.quorum.LearnerSessionTracker;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Validate various type of sessions against leader session tracker and learner
+ * session tracker
+ */
+public class SessionTrackerCheckTest extends ZKTestCase {
+
+ protected static final Logger LOG = LoggerFactory
+ .getLogger(SessionTrackerCheckTest.class);
+ public static final int TICK_TIME = 1000;
+ public static final int CONNECTION_TIMEOUT = TICK_TIME * 10;
+
+ private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts =
+ new ConcurrentHashMap<Long, Integer>();
+
+ private class Expirer implements SessionExpirer {
+ long sid;
+
+ public Expirer(long sid) {
+ this.sid = sid;
+ }
+
+ public void expire(Session session) {
+ }
+
+ public long getServerId() {
+ return sid;
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ sessionsWithTimeouts.clear();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @Test
+ public void testLearnerSessionTracker() throws Exception {
+ Expirer expirer = new Expirer(1);
+ // With local session on
+ LearnerSessionTracker tracker = new LearnerSessionTracker(expirer,
+ sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+ // Unknown session
+ long sessionId = 0xb100ded;
+ try {
+ tracker.checkSession(sessionId, null);
+ Assert.fail("Unknown session should have failed");
+ } catch (SessionExpiredException e) {
+ // Get expected exception
+ }
+
+ // Global session
+ sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail");
+ }
+
+ // Local session
+ sessionId = 0xf005ba11;
+ tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Local session should not fail");
+ }
+
+ // During session upgrade
+ sessionsWithTimeouts.put(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Session during upgrade should not fail");
+ }
+
+ // With local session off
+ tracker = new LearnerSessionTracker(expirer, sessionsWithTimeouts,
+ TICK_TIME, expirer.sid, false);
+
+ // Should be noop
+ sessionId = 0xdeadbeef;
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Should not get any exception");
+ }
+
+ }
+
+ @Test
+ public void testLeaderSessionTracker() throws Exception {
+ Expirer expirer = new Expirer(2);
+ // With local session on
+ LeaderSessionTracker tracker = new LeaderSessionTracker(expirer,
+ sessionsWithTimeouts, TICK_TIME, expirer.sid, true);
+
+ // Local session from other server
+ long sessionId = ((expirer.sid + 1) << 56) + 1;
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("local session from other server should not fail");
+ }
+
+ // Global session
+ tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail");
+ }
+ try {
+ tracker.checkGlobalSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail " + e);
+ }
+
+ // Local session from the leader
+ sessionId = (expirer.sid << 56) + 1;
+ ;
+ tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Local session on the leader should not fail");
+ }
+
+ // During session upgrade
+ tracker.addGlobalSession(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Session during upgrade should not fail");
+ }
+ try {
+ tracker.checkGlobalSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail " + e);
+ }
+
+ // With local session off
+ tracker = new LeaderSessionTracker(expirer, sessionsWithTimeouts,
+ TICK_TIME, expirer.sid, false);
+
+ // Global session
+ sessionId = 0xdeadbeef;
+ tracker.addSession(sessionId, CONNECTION_TIMEOUT);
+ try {
+ tracker.checkSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail");
+ }
+ try {
+ tracker.checkGlobalSession(sessionId, null);
+ } catch (Exception e) {
+ Assert.fail("Global session should not fail");
+ }
+
+ // Local session from other server
+ sessionId = ((expirer.sid + 1) << 56) + 2;
+ try {
+ tracker.checkSession(sessionId, null);
+ Assert.fail("local session from other server should fail");
+ } catch (SessionExpiredException e) {
+ // Got expected exception
+ }
+
+ // Local session from the leader
+ sessionId = ((expirer.sid) << 56) + 2;
+ try {
+ tracker.checkSession(sessionId, null);
+ Assert.fail("local session from the leader should fail");
+ } catch (SessionExpiredException e) {
+ // Got expected exception
+ }
+
+ }
+
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionTrackerCheckTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java?rev=1530781&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java Wed Oct 9 20:21:55 2013
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests that session upgrade works from local to global sessions.
+ * Expected behavior is that if global-only sessions are unset,
+ * and no upgrade interval is specified, then sessions will be
+ * created locally to the host. They will be upgraded to global
+ * sessions iff an operation is done on that session which requires
+ * persistence, i.e. creating an ephemeral node.
+ */
+public class SessionUpgradeTest extends ZKTestCase {
+ protected static final Logger LOG = LoggerFactory.getLogger(SessionUpgradeTest.class);
+ public static final int CONNECTION_TIMEOUT = ClientBase.CONNECTION_TIMEOUT;
+
+ private final QuorumBase qb = new QuorumBase();
+
+ @Before
+ public void setUp() throws Exception {
+ LOG.info("STARTING quorum " + getClass().getName());
+ qb.localSessionsEnabled = true;
+ qb.localSessionsUpgradingEnabled = true;
+ qb.setUp();
+ ClientBase.waitForServerUp(qb.hostPort, 10000);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ LOG.info("STOPPING quorum " + getClass().getName());
+ qb.tearDown();
+ }
+
+ @Test
+ public void testLocalSessionsWithoutEphemeralOnFollower() throws Exception {
+ testLocalSessionsWithoutEphemeral(false);
+ }
+
+ @Test
+ public void testLocalSessionsWithoutEphemeralOnLeader() throws Exception {
+ testLocalSessionsWithoutEphemeral(true);
+ }
+
+ private void testLocalSessionsWithoutEphemeral(boolean testLeader)
+ throws Exception {
+ String nodePrefix = "/testLocalSessions-"
+ + (testLeader ? "leaderTest-" : "followerTest-");
+ int leaderIdx = qb.getLeaderIndex();
+ Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+ int followerIdx = (leaderIdx + 1) % 5;
+ int otherFollowerIdx = (leaderIdx + 2) % 5;
+ int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+ String hostPorts[] = qb.hostPort.split(",");
+ CountdownWatcher watcher = new CountdownWatcher();
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // Try creating some data.
+ for (int i = 0; i < 5; i++) {
+ zk.create(nodePrefix + i, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ }
+
+ long localSessionId = zk.getSessionId();
+ byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+ // Try connecting with the same session id on a different
+ // server. This should fail since it is a local sesion.
+ try {
+ watcher.reset();
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+ hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+ localSessionId, localSessionPwd);
+
+ zknew.create(nodePrefix + "5", new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Assert.fail("Connection on the same session ID should fail.");
+ } catch (KeeperException.SessionExpiredException e) {
+ } catch (KeeperException.ConnectionLossException e) {
+ }
+
+ // If we're testing a follower, also check the session id on the
+ // leader. This should also fail
+ if (!testLeader) {
+ try {
+ watcher.reset();
+ DisconnectableZooKeeper zknew = new DisconnectableZooKeeper(
+ hostPorts[leaderIdx], CONNECTION_TIMEOUT,
+ watcher, localSessionId, localSessionPwd);
+
+ zknew.create(nodePrefix + "5", new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.PERSISTENT);
+ Assert.fail("Connection on the same session ID should fail.");
+ } catch (KeeperException.SessionExpiredException e) {
+ } catch (KeeperException.ConnectionLossException e) {
+ }
+ }
+
+ // However, we should be able to disconnect and reconnect to the same
+ // server with the same session id (as long as we do it quickly
+ // before expiration).
+ zk.disconnect();
+
+ watcher.reset();
+ zk = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+ localSessionId, localSessionPwd);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ zk.create(nodePrefix + "6", new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ // If we explicitly close the session, then the session id should no
+ // longer be valid.
+ zk.close();
+ try {
+ watcher.reset();
+ zk = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher,
+ localSessionId, localSessionPwd);
+
+ zk.create(nodePrefix + "7", new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Assert.fail("Reconnecting to a closed session ID should fail.");
+ } catch (KeeperException.SessionExpiredException e) {
+ }
+ }
+
+ @Test
+ public void testUpgradeWithEphemeralOnFollower() throws Exception {
+ testUpgradeWithEphemeral(false);
+ }
+
+ @Test
+ public void testUpgradeWithEphemeralOnLeader() throws Exception {
+ testUpgradeWithEphemeral(true);
+ }
+
+ private void testUpgradeWithEphemeral(boolean testLeader)
+ throws Exception {
+ String nodePrefix = "/testUpgrade-"
+ + (testLeader ? "leaderTest-" : "followerTest-");
+ int leaderIdx = qb.getLeaderIndex();
+ Assert.assertFalse("No leader in quorum?", leaderIdx == -1);
+ int followerIdx = (leaderIdx + 1) % 5;
+ int otherFollowerIdx = (leaderIdx + 2) % 5;
+ int testPeerIdx = testLeader ? leaderIdx : followerIdx;
+ String hostPorts[] = qb.hostPort.split(",");
+
+ CountdownWatcher watcher = new CountdownWatcher();
+ DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // Create some ephemeral nodes. This should force the session to
+ // be propagated to the other servers in the ensemble.
+ for (int i = 0; i < 5; i++) {
+ zk.create(nodePrefix + i, new byte[0],
+ ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
+ }
+
+ // We should be able to reconnect with the same session id on a
+ // different server, since it has been propagated.
+ long localSessionId = zk.getSessionId();
+ byte[] localSessionPwd = zk.getSessionPasswd().clone();
+
+ zk.disconnect();
+ watcher.reset();
+ zk = new DisconnectableZooKeeper(
+ hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+ localSessionId, localSessionPwd);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+
+ // The created ephemeral nodes are still around.
+ for (int i = 0; i < 5; i++) {
+ Assert.assertNotNull(zk.exists(nodePrefix + i, null));
+ }
+
+ // When we explicitly close the session, we should not be able to
+ // reconnect with the same session id
+ zk.close();
+
+ try {
+ watcher.reset();
+ zk = new DisconnectableZooKeeper(
+ hostPorts[otherFollowerIdx], CONNECTION_TIMEOUT, watcher,
+ localSessionId, localSessionPwd);
+ zk.exists(nodePrefix + "0", null);
+ Assert.fail("Reconnecting to a closed session ID should fail.");
+ } catch (KeeperException.SessionExpiredException e) {
+ }
+
+ watcher.reset();
+ // And the ephemeral nodes will be gone since the session died.
+ zk = new DisconnectableZooKeeper(
+ hostPorts[testPeerIdx], CONNECTION_TIMEOUT, watcher);
+ watcher.waitForConnected(CONNECTION_TIMEOUT);
+ for (int i = 0; i < 5; i++) {
+ Assert.assertNull(zk.exists(nodePrefix + i, null));
+ }
+ }
+}
Propchange: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/SessionUpgradeTest.java
------------------------------------------------------------------------------
svn:eol-style = native