You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2019/10/09 14:49:09 UTC

[zookeeper] branch master updated: ZOOKEEPER-3471: Fix potential lock unavailable due to dangling ephemeral nodes left during local session upgrading

This is an automated email from the ASF dual-hosted git repository.

andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 4951a09  ZOOKEEPER-3471: Fix potential lock unavailable due to dangling ephemeral nodes left during local session upgrading
4951a09 is described below

commit 4951a090d7c946f57ac5ab09b5d48a5d7831001d
Author: Fangmin Lyu <fa...@apache.org>
AuthorDate: Wed Oct 9 16:49:02 2019 +0200

    ZOOKEEPER-3471: Fix potential lock unavailable due to dangling ephemeral nodes left during local session upgrading
    
    Author: Fangmin Lyu <fa...@apache.org>
    
    Reviewers: andor@apache.org
    
    Closes #1025 from lvfangmin/ZOOKEEPER-3471
---
 .../apache/zookeeper/server/ZooKeeperServer.java   |  6 +-
 .../server/quorum/SessionUpgradeQuorumTest.java    | 95 +++++++++++++++++++++-
 2 files changed, 95 insertions(+), 6 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 3d6c375..48e427b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -554,7 +554,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     private void close(long sessionId) {
         Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
-        setLocalSessionFlag(si);
         submitRequest(si);
     }
 
@@ -915,7 +914,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         to.putInt(timeout);
         cnxn.setSessionId(sessionId);
         Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
-        setLocalSessionFlag(si);
         submitRequest(si);
         return sessionId;
     }
@@ -1072,6 +1070,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             touch(si.cnxn);
             boolean validpacket = Request.isValid(si.type);
             if (validpacket) {
+                setLocalSessionFlag(si);
                 firstProcessor.processRequest(si);
                 if (si.cnxn != null) {
                     incInProcess();
@@ -1574,9 +1573,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                     si.setLargeRequestSize(length);
                 }
                 si.setOwner(ServerCnxn.me);
-                // Always treat packet from the client as a possible
-                // local request.
-                setLocalSessionFlag(si);
                 submitRequest(si);
             }
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
index 7fc2f42..0be68ec 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.security.sasl.SaslException;
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
@@ -195,22 +196,114 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
         zk.close();
     }
 
+    @Test
+    public void testCloseSessionWhileUpgradeOnLeader()
+            throws IOException, KeeperException, InterruptedException {
+        int leaderId = -1;
+        for (int i = SERVER_COUNT - 1; i >= 0; i--) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leaderId = i;
+            }
+        }
+        if (leaderId > 0) {
+            makeSureEphemeralIsGone(leaderId);
+        }
+    }
+
+    @Test
+    public void testCloseSessionWhileUpgradeOnLearner()
+            throws IOException, KeeperException, InterruptedException {
+        int learnerId = -1;
+        for (int i = SERVER_COUNT - 1; i >= 0; i--) {
+            if (mt[i].main.quorumPeer.follower != null) {
+                learnerId = i;
+            }
+        }
+        if (learnerId > 0) {
+            makeSureEphemeralIsGone(learnerId);
+        }
+    }
+
+    private void makeSureEphemeralIsGone(int sid)
+            throws IOException, KeeperException, InterruptedException {
+        // Delay submit request to simulate the request queued in
+        // RequestThrottler
+        qpMain[sid].setSubmitDelayMs(200);
+
+        // Create a client and an ephemeral node
+        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        waitForOne(zk, States.CONNECTED);
+
+        final String node = "/node-1";
+        zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.EPHEMERAL, new StringCallback() {
+                    @Override
+                    public void processResult(int rc, String path, Object ctx,
+                            String name) {}
+                }, null);
+
+        // close the client
+        zk.close();
+
+        // make sure the ephemeral is gone
+        zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+                ClientBase.CONNECTION_TIMEOUT, this);
+        waitForOne(zk, States.CONNECTED);
+        assertNull(zk.exists(node, false));
+        zk.close();
+    }
+
     private static class TestQPMainDropSessionUpgrading extends TestQPMain {
 
         private volatile boolean shouldDrop = false;
+        private volatile int submitDelayMs = 0;
 
         public void setDropCreateSession(boolean dropCreateSession) {
             shouldDrop = dropCreateSession;
         }
 
+        public void setSubmitDelayMs(int delay) {
+            this.submitDelayMs = delay;
+        }
+
         @Override
         protected QuorumPeer getQuorumPeer() throws SaslException {
             return new QuorumPeer() {
 
                 @Override
+                protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+                    return new Leader(this, new LeaderZooKeeperServer(
+                              logFactory, this, this.getZkDb()) {
+
+                        @Override
+                        public void submitRequestNow(Request si) {
+                            if (submitDelayMs > 0) {
+                                try {
+                                    Thread.sleep(submitDelayMs);
+                                } catch (Exception e) {}
+                            }
+                            super.submitRequestNow(si);
+                        }
+                    });
+                }
+
+                @Override
                 protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
 
-                    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
+                    return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
+
+                        @Override
+                        public void submitRequestNow(Request si) {
+                            if (submitDelayMs > 0) {
+                                try {
+                                    Thread.sleep(submitDelayMs);
+                                } catch (Exception e) {}
+                            }
+                            super.submitRequestNow(si);
+                        }
+
+                    }) {
 
                         @Override
                         protected void request(Request request) throws IOException {