You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ha...@apache.org on 2017/03/21 16:50:01 UTC

zookeeper git commit: ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op

Repository: zookeeper
Updated Branches:
  refs/heads/master 0313a0e0b -> 63577ba52


ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op

Patch and unit test for issue.

Author: Brian Nixon <ni...@fb.com>

Reviewers: Abraham Fine <af...@apache.org>, Michael Han <ha...@apache.org>

Closes #195 from enixon/ZOOKEEPER-2725 and squashes the following commits:

0a32b9a [Brian Nixon] fill diamond operators
7f2fd37 [Brian Nixon] add direct testing of checkUpgradeSession in addition to integration style test
5cbca58 [Brian Nixon] ZOOKEEPER-2725: Promote local session to global when ephemeral created in multi-op


Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo
Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/63577ba5
Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/63577ba5
Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/63577ba5

Branch: refs/heads/master
Commit: 63577ba52a829391ded3ae11c62c0e996865b5c7
Parents: 0313a0e
Author: Brian Nixon <ni...@fb.com>
Authored: Tue Mar 21 09:49:56 2017 -0700
Committer: Michael Han <ha...@apache.org>
Committed: Tue Mar 21 09:49:56 2017 -0700

----------------------------------------------------------------------
 .../server/quorum/QuorumZooKeeperServer.java    |  41 ++++--
 .../server/MultiOpSessionUpgradeTest.java       | 136 +++++++++++++++++++
 2 files changed, 169 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zookeeper/blob/63577ba5/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
----------------------------------------------------------------------
diff --git a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
index df7b407..f6e4f11 100644
--- a/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
+++ b/src/java/main/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
@@ -23,6 +23,8 @@ import java.nio.ByteBuffer;
 
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.MultiTransactionRecord;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.server.ByteBufferInputStream;
@@ -62,18 +64,41 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
         // This is called by the request processor thread (either follower
         // or observer request processor), which is unique to a learner.
         // So will not be called concurrently by two threads.
-        if (request.type != OpCode.create ||
+        if ((request.type != OpCode.create && request.type != OpCode.create2 && request.type != OpCode.multi) ||
             !upgradeableSessionTracker.isLocalSession(request.sessionId)) {
             return null;
         }
-        CreateRequest createRequest = new CreateRequest();
-        request.request.rewind();
-        ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
-        request.request.rewind();
-        CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
-        if (!createMode.isEphemeral()) {
-            return null;
+
+        if (OpCode.multi == request.type) {
+            MultiTransactionRecord multiTransactionRecord = new MultiTransactionRecord();
+            request.request.rewind();
+            ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord);
+            request.request.rewind();
+            boolean containsEphemeralCreate = false;
+            for (Op op : multiTransactionRecord) {
+                if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
+                    CreateRequest createRequest = (CreateRequest)op.toRequestRecord();
+                    CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+                    if (createMode.isEphemeral()) {
+                        containsEphemeralCreate = true;
+                        break;
+                    }
+                }
+            }
+            if (!containsEphemeralCreate) {
+                return null;
+            }
+        } else {
+            CreateRequest createRequest = new CreateRequest();
+            request.request.rewind();
+            ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
+            request.request.rewind();
+            CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
+            if (!createMode.isEphemeral()) {
+                return null;
+            }
         }
+
         // Uh oh.  We need to upgrade before we can proceed.
         if (!self.isLocalSessionsUpgradingEnabled()) {
             throw new KeeperException.EphemeralOnLocalSessionException();

http://git-wip-us.apache.org/repos/asf/zookeeper/blob/63577ba5/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
----------------------------------------------------------------------
diff --git a/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
new file mode 100644
index 0000000..0426e22
--- /dev/null
+++ b/src/java/test/org/apache/zookeeper/server/MultiOpSessionUpgradeTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.OpResult;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.proto.CreateRequest;
+import org.apache.zookeeper.proto.GetDataRequest;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.apache.zookeeper.server.quorum.QuorumZooKeeperServer;
+import org.apache.zookeeper.server.quorum.UpgradeableSessionTracker;
+import org.apache.zookeeper.test.QuorumBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class MultiOpSessionUpgradeTest extends QuorumBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(MultiOpSessionUpgradeTest.class);
+
+    @Override
+    public void setUp() throws Exception {
+        localSessionsEnabled = true;
+        localSessionsUpgradingEnabled = true;
+        super.setUp();
+    }
+
+    @Test
+    public void ephemeralCreateMultiOpTest() throws KeeperException, InterruptedException, IOException {
+        final ZooKeeper zk = createClient();
+
+        String data = "test";
+        String path = "/ephemeralcreatemultiop";
+        zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId());
+        Assert.assertNotNull("unable to find server interlocutor", server);
+        UpgradeableSessionTracker sessionTracker = (UpgradeableSessionTracker)server.getSessionTracker();
+        Assert.assertFalse("session already global", sessionTracker.isGlobalSession(zk.getSessionId()));
+
+        List<OpResult> multi = null;
+        try {
+            multi = zk.multi(Arrays.asList(
+                    Op.setData(path, data.getBytes(), 0),
+                    Op.create(path + "/e", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL),
+                    Op.create(path + "/p", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT),
+                    Op.create(path + "/q", data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL)
+            ));
+        } catch (KeeperException.SessionExpiredException e) {
+            // the scenario that inspired this unit test
+            Assert.fail("received session expired for a session promotion in a multi-op");
+        }
+
+        Assert.assertNotNull(multi);
+        Assert.assertEquals(4, multi.size());
+        Assert.assertEquals(data, new String(zk.getData(path + "/e", false, null)));
+        Assert.assertEquals(data, new String(zk.getData(path + "/p", false, null)));
+        Assert.assertEquals(data, new String(zk.getData(path + "/q", false, null)));
+        Assert.assertTrue("session not promoted", sessionTracker.isGlobalSession(zk.getSessionId()));
+    }
+
+    @Test
+    public void directCheckUpgradeSessionTest() throws IOException, InterruptedException, KeeperException {
+        final ZooKeeper zk = createClient();
+
+        String path = "/directcheckupgradesession";
+        zk.create(path, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        QuorumZooKeeperServer server = getConnectedServer(zk.getSessionId());
+        Assert.assertNotNull("unable to find server interlocutor", server);
+
+        Request readRequest = makeGetDataRequest(path, zk.getSessionId());
+        Request createRequest = makeCreateRequest(path + "/e", zk.getSessionId());
+        Assert.assertNull("tried to upgrade on a read", server.checkUpgradeSession(readRequest));
+        Assert.assertNotNull("failed to upgrade on a create", server.checkUpgradeSession(createRequest));
+        Assert.assertNull("tried to upgrade after successful promotion",
+                server.checkUpgradeSession(createRequest));
+    }
+
+    private Request makeGetDataRequest(String path, long sessionId) throws IOException {
+        ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+        GetDataRequest getDataRequest = new GetDataRequest(path, false);
+        getDataRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.getData, bb, new ArrayList<Id>());
+    }
+
+    private Request makeCreateRequest(String path, long sessionId) throws IOException {
+        ByteArrayOutputStream boas = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(boas);
+        CreateRequest createRequest = new CreateRequest(path,
+                "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL.toFlag());
+        createRequest.serialize(boa, "request");
+        ByteBuffer bb = ByteBuffer.wrap(boas.toByteArray());
+        return new Request(null, sessionId, 1, ZooDefs.OpCode.create2, bb, new ArrayList<Id>());
+    }
+
+    private QuorumZooKeeperServer getConnectedServer(long sessionId) {
+        for (QuorumPeer peer : getPeerList()) {
+            if (peer.getActiveServer().getSessionTracker().isTrackingSession(sessionId)) {
+                return (QuorumZooKeeperServer)peer.getActiveServer();
+            }
+        }
+        return null;
+    }
+}