You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/03/21 16:50:01 UTC
zookeeper git commit: ZOOKEEPER-2725: Promote local session to global
when ephemeral created in multi-op
Repository: zookeeper
Updated Branches:
refs/heads/master 0313a0e0b -> 63577ba52
ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op
Patch and unit test for issue.
Author: Brian Nixon <ni...@fb.com>
Reviewers: Abraham Fine <af...@apache.org>, Michael Han <ha...@apache.org>
Closes #195 from enixon/ZOOKEEPER-2725 and squashes the following commits:
0a32b9a [Brian Nixon] fill diamond operators
7f2fd37 [Brian Nixon] add direct testing of checkUpgradeSession in addition to integration style test
5cbca58 [Brian Nixon] ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op
Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/63577ba5
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/63577ba5
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/63577ba5
Branch: refs/heads/master
Commit: 63577ba52a829391ded3ae11c62c0e996865b5c7
Parents: 0313a0e
Author: Brian Nixon <ni...@fb.com>
Authored: Tue Mar 21 09:49:56 2017 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Tue Mar 21 09:49:56 2017 -0700
----------------------------------------------------------------------
.../server/quorum/QuorumZooKeeperServer.java | 41 ++++--
.../server/MultiOpSessionUpgradeTest.java | 136 +++++++++++++++++++
2 files changed, 169 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/63577ba5/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index df7b407..f6e4f11 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.ByteBufferInputStream;
@@ -62,18 +64,41 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
// This is called by the request processor thread (either follower
// or observer request processor), which is unique to a learner.
// So will not be called concurrently by two threads.
- if (request.type != OpCode.create ||
+ if ((request.type != OpCode.create && request.type != OpCode.create2 && request.type != OpCode.multi) ||
!upgradeableSessionTracker.isLocalSession(request.sessionId)) {
return null;
}
- CreateRequest createRequest = new CreateRequest();
- request.request.rewind();
- ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
- request.request.rewind();
- CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
- if (!createMode.isEphemeral()) {
- return null;
+
+ if (OpCode.multi == request.type) {
+ MultiTransactionRecord multiTransactionRecord = new MultiTransactionRecord();
+ request.request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord);
+ request.request.rewind();
+ boolean containsEphemeralCreate = false;
+ for (Op op : multiTransactionRecord) {
+ if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
+ CreateRequest createRequest = (CreateRequest)op.toRequestRecord();
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+ if (createMode.isEphemeral()) {
+ containsEphemeralCreate = true;
+ break;
+ }
+ }
+ }
+ if (!containsEphemeralCreate) {
+ return null;
+ }
+ } else {
+ CreateRequest createRequest = new CreateRequest();
+ request.request.rewind();
+ ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+ request.request.rewind();
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+ if (!createMode.isEphemeral()) {
+ return null;
+ }
}
+
// Uh oh. We need to upgrade before we can proceed.
if (!self.isLocalSessionsUpgradingEnabled()) {
throw new KeeperException.EphemeralOnLocalSessionException();
http://git-wip-us.apache.org/repos/asf/zookeeper/blob/63577ba5/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
new file mode 100644
index 0000000..0426e22
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.quorum.UpgradeableSessionTracker;
+import org.apache.zookeeper.test.QuorumBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiOpSessionUpgradeTest extends QuorumBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(MultiOpSessionUpgradeTest.class);
+
+ @Override
+ public void setUp() throws Exception {
+ localSessionsEnabled = true;
+ localSessionsUpgradingEnabled = true;
+ super.setUp();
+ }
+
+ @Test
+ public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedException, IOException {
+ final ZooKeeper zk = createClient();
+
+ String data = "test";
+ String path = "/ephemeralcreatemultiop";
+ zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId());
+ Assert.assertNotNull("unable to find server interlocutor", server);
+ UpgradeableSessionTracker sessionTracker = (UpgradeableSessionTracker)server.getSessionTracker();
+ Assert.assertFalse("session already global", sessionTracker.isGlobalSession(zk.getSessionId()));
+
+ List<OpResult> multi = null;
+ try {
+ multi = zk.multi(Arrays.asList(
+ Op.setData(path, data.getBytes(), 0),
+ Op.create(path + "/e", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL),
+ Op.create(path + "/p", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+ Op.create(path + "/q", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+ ));
+ } catch (KeeperException.SessionExpiredException e) {
+ // the scenario that inspired this unit test
+ Assert.fail("received session expired for a session promotion in a multi-op");
+ }
+
+ Assert.assertNotNull(multi);
+ Assert.assertEquals(4, multi.size());
+ Assert.assertEquals(data, new String(zk.getData(path + "/e", false, null)));
+ Assert.assertEquals(data, new String(zk.getData(path + "/p", false, null)));
+ Assert.assertEquals(data, new String(zk.getData(path + "/q", false, null)));
+ Assert.assertTrue("session not promoted", sessionTracker.isGlobalSession(zk.getSessionId()));
+ }
+
+ @Test
+ public void directCheckUpgradeSessionTest() throws IOException, InterruptedException, KeeperException {
+ final ZooKeeper zk = createClient();
+
+ String path = "/directcheckupgradesession";
+ zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+ QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId());
+ Assert.assertNotNull("unable to find server interlocutor", server);
+
+ Request readRequest = makeGetDataRequest(path, zk.getSessionId());
+ Request createRequest = makeCreateRequest(path + "/e", zk.getSessionId());
+ Assert.assertNull("tried to upgrade on a read", server.checkUpgradeSession(readRequest));
+ Assert.assertNotNull("failed to upgrade on a create", server.checkUpgradeSession(createRequest));
+ Assert.assertNull("tried to upgrade after successful promotion",
+ server.checkUpgradeSession(createRequest));
+ }
+
+ private Request makeGetDataRequest(String path, long sessionId) throws IOException {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ GetDataRequest getDataRequest = new GetDataRequest(path, false);
+ getDataRequest.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<Id>());
+ }
+
+ private Request makeCreateRequest(String path, long sessionId) throws IOException {
+ ByteArrayOutputStream boas = new ByteArrayOutputStream();
+ BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+ CreateRequest createRequest = new CreateRequest(path,
+ "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
+ createRequest.serialize(boa, "request");
+ ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+ return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+ }
+
+ private QuorumZooKeeperServer getConnectedServer(long sessionId) {
+ for (QuorumPeer peer : getPeerList()) {
+ if (peer.getActiveServer().getSessionTracker().isTrackingSession(sessionId)) {
+ return (QuorumZooKeeperServer)peer.getActiveServer();
+ }
+ }
+ return null;
+ }
+}