You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2021/03/09 07:04:21 UTC

[zookeeper] branch branch-3.6 updated: ZOOKEEPER-4223: Backport ZOOKEEPER-3706 to branch-3.6

This is an automated email from the ASF dual-hosted git repository.

arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.6 by this push:
     new d46689e  ZOOKEEPER-4223: Backport ZOOKEEPER-3706 to branch-3.6
d46689e is described below

commit d46689e5861516b92605bcb911f7ffce9ad6fba4
Author: Mukti Krishnan <mu...@gmail.com>
AuthorDate: Tue Mar 9 12:36:35 2021 +0530

    ZOOKEEPER-4223: Backport ZOOKEEPER-3706 to branch-3.6
    
    Author: Mukti <mu...@gmail.com>
    
    Reviewers: Mohammad Arshad <ar...@apache.org>
    
    Closes #1617 from MuktiKrishnan/ZOOKEEPER-4223
---
 .../main/java/org/apache/zookeeper/ClientCnxn.java |  43 ++-
 .../zookeeper/ClientCnxnSocketFragilityTest.java   | 362 +++++++++++++++++++++
 2 files changed, 394 insertions(+), 11 deletions(-)

diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index b2c5a57..a727082 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -418,6 +418,7 @@ public class ClientCnxn {
      * @param canBeReadOnly
      *                whether the connection is allowed to go to read-only
      *                mode in case of partitioning
+     * @throws IOException in cases of broken network
      */
     public ClientCnxn(
         String chrootPath,
@@ -428,7 +429,7 @@ public class ClientCnxn {
         ClientCnxnSocket clientCnxnSocket,
         long sessionId,
         byte[] sessionPasswd,
-        boolean canBeReadOnly) {
+        boolean canBeReadOnly) throws IOException {
         this.zooKeeper = zooKeeper;
         this.watcher = watcher;
         this.sessionId = sessionId;
@@ -795,6 +796,11 @@ public class ClientCnxn {
         eventThread.queueCallback(cb, rc, path, ctx);
     }
 
+    // for test only
+    protected void onConnecting(InetSocketAddress addr) {
+
+    }
+
     private void conLossPacket(Packet p) {
         if (p.replyHeader == null) {
             return;
@@ -888,7 +894,7 @@ public class ClientCnxn {
               case AUTHPACKET_XID:
                 LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                 if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
-                    state = States.AUTH_FAILED;
+                    changeZkState(States.AUTH_FAILED);
                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                         Watcher.Event.KeeperState.AuthFailed, null));
                     eventThread.queueEventOfDeath();
@@ -967,9 +973,9 @@ public class ClientCnxn {
             }
         }
 
-        SendThread(ClientCnxnSocket clientCnxnSocket) {
+        SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
             super(makeThreadName("-SendThread()"));
-            state = States.CONNECTING;
+            changeZkState(States.CONNECTING);
             this.clientCnxnSocket = clientCnxnSocket;
             setDaemon(true);
         }
@@ -983,10 +989,19 @@ public class ClientCnxn {
          *
          * @return
          */
-        ZooKeeper.States getZkState() {
+        synchronized ZooKeeper.States getZkState() {
             return state;
         }
 
+        synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
+            if (!state.isAlive() && newState == States.CONNECTING) {
+                throw new IOException(
+                        "Connection has already been closed and reconnection is not allowed");
+            }
+            // It's safer to place state modification at the end.
+            state = newState;
+        }
+
         ClientCnxnSocket getClientCnxnSocket() {
             return clientCnxnSocket;
         }
@@ -1133,7 +1148,7 @@ public class ClientCnxn {
                     LOG.warn("Unexpected exception", e);
                 }
             }
-            state = States.CONNECTING;
+            changeZkState(States.CONNECTING);
 
             String hostPort = addr.getHostString() + ":" + addr.getPort();
             MDC.put("myid", hostPort);
@@ -1191,6 +1206,7 @@ public class ClientCnxn {
                         } else {
                             serverAddress = hostProvider.next(1000);
                         }
+                        onConnecting(serverAddress);
                         startConnect(serverAddress);
                         clientCnxnSocket.updateLastSendAndHeard();
                     }
@@ -1204,7 +1220,7 @@ public class ClientCnxn {
                                     zooKeeperSaslClient.initialize(ClientCnxn.this);
                                 } catch (SaslException e) {
                                     LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
-                                    state = States.AUTH_FAILED;
+                                    changeZkState(States.AUTH_FAILED);
                                     sendAuthEvent = true;
                                 }
                             }
@@ -1212,7 +1228,7 @@ public class ClientCnxn {
                             if (authState != null) {
                                 if (authState == KeeperState.AuthFailed) {
                                     // An authentication error occurred during authentication with the Zookeeper Server.
-                                    state = States.AUTH_FAILED;
+                                    changeZkState(States.AUTH_FAILED);
                                     sendAuthEvent = true;
                                 } else {
                                     if (authState == KeeperState.SaslAuthenticated) {
@@ -1406,7 +1422,7 @@ public class ClientCnxn {
             boolean isRO) throws IOException {
             negotiatedSessionTimeout = _negotiatedSessionTimeout;
             if (negotiatedSessionTimeout <= 0) {
-                state = States.CLOSED;
+                changeZkState(States.CLOSED);
 
                 eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
                 eventThread.queueEventOfDeath();
@@ -1427,7 +1443,7 @@ public class ClientCnxn {
             hostProvider.onConnected();
             sessionId = _sessionId;
             sessionPasswd = _sessionPasswd;
-            state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
+            changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
             seenRwServerBefore |= !isRO;
             LOG.info(
                 "Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
@@ -1440,7 +1456,12 @@ public class ClientCnxn {
         }
 
         void close() {
-            state = States.CLOSED;
+            try {
+                changeZkState(States.CLOSED);
+            } catch (IOException e) {
+                LOG.warn("Connection close fails when migrates state from {} to CLOSED",
+                        getZkState());
+            }
             clientCnxnSocket.onClosing();
         }
 
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
new file mode 100644
index 0000000..07b7d62
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase {
+
+    private static final int SERVER_COUNT = 3;
+
+    private static final int SESSION_TIMEOUT = 40000;
+
+    public static final int CONNECTION_TIMEOUT = 30000;
+
+    private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator();
+
+    private volatile CustomZooKeeper zk = null;
+
+    private volatile FragileClientCnxnSocketNIO socket = null;
+
+    private volatile CustomClientCnxn cnxn = null;
+
+    private String getCxnString(int[] clientPorts) {
+        StringBuffer hostPortBuffer = new StringBuffer();
+        for (int i = 0; i < clientPorts.length; i++) {
+            hostPortBuffer.append("127.0.0.1:");
+            hostPortBuffer.append(clientPorts[i]);
+            if (i != (clientPorts.length - 1)) {
+                hostPortBuffer.append(',');
+            }
+        }
+        return hostPortBuffer.toString();
+    }
+
+    private void closeZookeeper(ZooKeeper zk) {
+        Executors.newSingleThreadExecutor().submit(() -> {
+            try {
+                LOG.info("closeZookeeper is fired");
+                zk.close();
+            } catch (InterruptedException e) {
+            }
+        });
+    }
+
+    @Test
+    public void testClientCnxnSocketFragility() throws Exception {
+        System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+                FragileClientCnxnSocketNIO.class.getName());
+        System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000");
+        final int[] clientPorts = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":"
+                    + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        MainThread[] mt = new MainThread[SERVER_COUNT];
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
+            mt[i].start();
+        }
+
+        // Ensure server started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+        }
+        String path = "/testClientCnxnSocketFragility";
+        String data = "balabala";
+        ClientWatcher watcher = new ClientWatcher();
+        zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher);
+        watcher.watchFor(zk);
+
+        // Let's see some successful operations
+        zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data);
+        Assert.assertTrue(!watcher.isSessionExpired());
+
+        // Let's make a broken operation
+        socket.mute();
+        boolean catchKeeperException = false;
+        try {
+            zk.getData(path, false, new Stat());
+        } catch (KeeperException e) {
+            catchKeeperException = true;
+            Assert.assertFalse(e instanceof KeeperException.SessionExpiredException);
+        }
+        socket.unmute();
+        Assert.assertTrue(catchKeeperException);
+        Assert.assertTrue(!watcher.isSessionExpired());
+
+        GetDataRetryForeverBackgroundTask retryForeverGetData =
+                new GetDataRetryForeverBackgroundTask(zk, path);
+        retryForeverGetData.startTask();
+        // Let's make a broken network
+        socket.mute();
+
+        // Let's attempt to close ZooKeeper
+        cnxn.attemptClose();
+
+        // Wait some time to expect continuous reconnecting.
+        // We try to make reconnecting hit the unsafe region.
+        cnxn.waitUntilHitUnsafeRegion();
+
+        // close zk with timeout 1000 milli seconds
+        closeZookeeper(zk);
+        TimeUnit.MILLISECONDS.sleep(3000);
+
+        // Since we already close zookeeper, we expect that the zk should not be alive.
+        Assert.assertTrue(!zk.isAlive());
+        Assert.assertTrue(!watcher.isSessionExpired());
+
+        retryForeverGetData.syncCloseTask();
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i].shutdown();
+        }
+    }
+
+    class GetDataRetryForeverBackgroundTask extends Thread {
+        private volatile boolean alive;
+        private final CustomZooKeeper zk;
+        private final String path;
+
+        GetDataRetryForeverBackgroundTask(CustomZooKeeper zk, String path) {
+            this.alive = false;
+            this.zk = zk;
+            this.path = path;
+            // marked as daemon to avoid exhausting CPU
+            setDaemon(true);
+        }
+
+        void startTask() {
+            alive = true;
+            start();
+        }
+
+        void syncCloseTask() throws InterruptedException {
+            alive = false;
+            join();
+        }
+
+        @Override
+        public void run() {
+            while (alive) {
+                try {
+                    zk.getData(path, false, new Stat());
+                    // sleep for a while to avoid exhausting CPU
+                    TimeUnit.MILLISECONDS.sleep(500);
+                } catch (Exception e) {
+                    LOG.info("zookeeper getData failed on path {}", path);
+                }
+            }
+        }
+    }
+
+    public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+
+        private volatile boolean mute;
+
+        public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
+            super(clientConfig);
+            mute = false;
+        }
+
+        synchronized void mute() {
+            if (!mute) {
+                LOG.info("Fire socket mute");
+                mute = true;
+            }
+        }
+
+        synchronized void unmute() {
+            if (mute) {
+                LOG.info("Fire socket unmute");
+                mute = false;
+            }
+        }
+
+        @Override
+        void doTransport(int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn)
+                throws IOException, InterruptedException {
+            if (mute) {
+                throw new IOException("Socket is mute");
+            }
+            super.doTransport(waitTimeOut, pendingQueue, cnxn);
+        }
+
+        @Override
+        void connect(InetSocketAddress addr) throws IOException {
+            if (mute) {
+                throw new IOException("Socket is mute");
+            }
+            super.connect(addr);
+        }
+    }
+
+    class ClientWatcher implements Watcher {
+
+        private ZooKeeper zk;
+
+        private boolean sessionExpired = false;
+
+        void watchFor(ZooKeeper zk) {
+            this.zk = zk;
+        }
+
+        @Override
+        public void process(WatchedEvent event) {
+            LOG.info("Watcher got {}", event);
+            if (event.getState() == KeeperState.Expired) {
+                sessionExpired = true;
+            }
+        }
+
+        boolean isSessionExpired() {
+            return sessionExpired;
+        }
+    }
+
+    // Coordinate to construct the risky scenario.
+    class UnsafeCoordinator {
+
+        private CountDownLatch syncLatch = new CountDownLatch(2);
+
+        void sync(boolean closing) {
+            LOG.info("Attempt to sync with {}", closing);
+            if (closing) {
+                syncLatch.countDown();
+                try {
+                    syncLatch.await();
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+
+    class CustomClientCnxn extends ClientCnxn {
+
+        private volatile boolean closing = false;
+
+        private volatile boolean hitUnsafeRegion = false;
+
+        public CustomClientCnxn(
+                String chrootPath,
+                HostProvider hostProvider,
+                int sessionTimeout,
+                ZooKeeper zooKeeper,
+                ClientWatchManager watcher,
+                ClientCnxnSocket clientCnxnSocket,
+                boolean canBeReadOnly) throws IOException {
+            super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+        }
+
+        void attemptClose() {
+            closing = true;
+        }
+
+        void waitUntilHitUnsafeRegion() {
+            while (!hitUnsafeRegion) {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(100);
+                } catch (InterruptedException e) {
+                }
+            }
+        }
+
+        @Override
+        protected void onConnecting(InetSocketAddress addr) {
+            if (closing) {
+                LOG.info("Attempt to connnecting {} {} {}", addr, closing, state);
+                ///////// Unsafe Region ////////
+                // Slow down and zoom out the unsafe point to make risk
+                // The unsafe point is that startConnect happens after sendThread.close
+                hitUnsafeRegion = true;
+                unsafeCoordinator.sync(closing);
+                ////////////////////////////////
+            }
+        }
+
+        @Override
+        public void disconnect() {
+            Assert.assertTrue(closing);
+            LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state);
+            sendThread.close();
+            ///////// Unsafe Region ////////
+            unsafeCoordinator.sync(closing);
+            ////////////////////////////////
+            try {
+                sendThread.join();
+            } catch (InterruptedException ex) {
+                LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
+            }
+            eventThread.queueEventOfDeath();
+            if (zooKeeperSaslClient != null) {
+                zooKeeperSaslClient.shutdown();
+            }
+        }
+    }
+
+    class CustomZooKeeper extends ZooKeeper {
+
+        public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
+            super(connectString, sessionTimeout, watcher);
+        }
+
+        public boolean isAlive() {
+            return cnxn.getState().isAlive();
+        }
+
+        @Override
+        protected ClientCnxn createConnection(
+                String chrootPath,
+                HostProvider hostProvider,
+                int sessionTimeout,
+                ZooKeeper zooKeeper,
+                ClientWatchManager watcher,
+                ClientCnxnSocket clientCnxnSocket,
+                boolean canBeReadOnly) throws IOException {
+            Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO);
+            socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
+            ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+            return ClientCnxnSocketFragilityTest.this.cnxn;
+        }
+    }
+}
\ No newline at end of file