You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ar...@apache.org on 2021/03/09 07:04:21 UTC
[zookeeper] branch branch-3.6 updated: ZOOKEEPER-4223: Backport
ZOOKEEPER-3706 to branch-3.6
This is an automated email from the ASF dual-hosted git repository.
arshad pushed a commit to branch branch-3.6
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.6 by this push:
new d46689e ZOOKEEPER-4223: Backport ZOOKEEPER-3706 to branch-3.6
d46689e is described below
commit d46689e5861516b92605bcb911f7ffce9ad6fba4
Author: Mukti Krishnan <mu...@gmail.com>
AuthorDate: Tue Mar 9 12:36:35 2021 +0530
ZOOKEEPER-4223: Backport ZOOKEEPER-3706 to branch-3.6
Author: Mukti <mu...@gmail.com>
Reviewers: Mohammad Arshad <ar...@apache.org>
Closes #1617 from MuktiKrishnan/ZOOKEEPER-4223
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 43 ++-
.../zookeeper/ClientCnxnSocketFragilityTest.java | 362 +++++++++++++++++++++
2 files changed, 394 insertions(+), 11 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index b2c5a57..a727082 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -418,6 +418,7 @@ public class ClientCnxn {
* @param canBeReadOnly
* whether the connection is allowed to go to read-only
* mode in case of partitioning
+ * @throws IOException in cases of broken network
*/
public ClientCnxn(
String chrootPath,
@@ -428,7 +429,7 @@ public class ClientCnxn {
ClientCnxnSocket clientCnxnSocket,
long sessionId,
byte[] sessionPasswd,
- boolean canBeReadOnly) {
+ boolean canBeReadOnly) throws IOException {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
@@ -795,6 +796,11 @@ public class ClientCnxn {
eventThread.queueCallback(cb, rc, path, ctx);
}
+ // for test only
+ protected void onConnecting(InetSocketAddress addr) {
+
+ }
+
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
@@ -888,7 +894,7 @@ public class ClientCnxn {
case AUTHPACKET_XID:
LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
- state = States.AUTH_FAILED;
+ changeZkState(States.AUTH_FAILED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
eventThread.queueEventOfDeath();
@@ -967,9 +973,9 @@ public class ClientCnxn {
}
}
- SendThread(ClientCnxnSocket clientCnxnSocket) {
+ SendThread(ClientCnxnSocket clientCnxnSocket) throws IOException {
super(makeThreadName("-SendThread()"));
- state = States.CONNECTING;
+ changeZkState(States.CONNECTING);
this.clientCnxnSocket = clientCnxnSocket;
setDaemon(true);
}
@@ -983,10 +989,19 @@ public class ClientCnxn {
*
* @return
*/
- ZooKeeper.States getZkState() {
+ synchronized ZooKeeper.States getZkState() {
return state;
}
+ synchronized void changeZkState(ZooKeeper.States newState) throws IOException {
+ if (!state.isAlive() && newState == States.CONNECTING) {
+ throw new IOException(
+ "Connection has already been closed and reconnection is not allowed");
+ }
+ // It's safer to place state modification at the end.
+ state = newState;
+ }
+
ClientCnxnSocket getClientCnxnSocket() {
return clientCnxnSocket;
}
@@ -1133,7 +1148,7 @@ public class ClientCnxn {
LOG.warn("Unexpected exception", e);
}
}
- state = States.CONNECTING;
+ changeZkState(States.CONNECTING);
String hostPort = addr.getHostString() + ":" + addr.getPort();
MDC.put("myid", hostPort);
@@ -1191,6 +1206,7 @@ public class ClientCnxn {
} else {
serverAddress = hostProvider.next(1000);
}
+ onConnecting(serverAddress);
startConnect(serverAddress);
clientCnxnSocket.updateLastSendAndHeard();
}
@@ -1204,7 +1220,7 @@ public class ClientCnxn {
zooKeeperSaslClient.initialize(ClientCnxn.this);
} catch (SaslException e) {
LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
- state = States.AUTH_FAILED;
+ changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
}
}
@@ -1212,7 +1228,7 @@ public class ClientCnxn {
if (authState != null) {
if (authState == KeeperState.AuthFailed) {
// An authentication error occurred during authentication with the Zookeeper Server.
- state = States.AUTH_FAILED;
+ changeZkState(States.AUTH_FAILED);
sendAuthEvent = true;
} else {
if (authState == KeeperState.SaslAuthenticated) {
@@ -1406,7 +1422,7 @@ public class ClientCnxn {
boolean isRO) throws IOException {
negotiatedSessionTimeout = _negotiatedSessionTimeout;
if (negotiatedSessionTimeout <= 0) {
- state = States.CLOSED;
+ changeZkState(States.CLOSED);
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Expired, null));
eventThread.queueEventOfDeath();
@@ -1427,7 +1443,7 @@ public class ClientCnxn {
hostProvider.onConnected();
sessionId = _sessionId;
sessionPasswd = _sessionPasswd;
- state = (isRO) ? States.CONNECTEDREADONLY : States.CONNECTED;
+ changeZkState((isRO) ? States.CONNECTEDREADONLY : States.CONNECTED);
seenRwServerBefore |= !isRO;
LOG.info(
"Session establishment complete on server {}, session id = 0x{}, negotiated timeout = {}{}",
@@ -1440,7 +1456,12 @@ public class ClientCnxn {
}
void close() {
- state = States.CLOSED;
+ try {
+ changeZkState(States.CLOSED);
+ } catch (IOException e) {
+ LOG.warn("Connection close fails when migrates state from {} to CLOSED",
+ getZkState());
+ }
clientCnxnSocket.onClosing();
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
new file mode 100644
index 0000000..07b7d62
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -0,0 +1,362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Queue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.zookeeper.ClientCnxn.Packet;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.client.HostProvider;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ClientCnxnSocketFragilityTest extends QuorumPeerTestBase {
+
+ private static final int SERVER_COUNT = 3;
+
+ private static final int SESSION_TIMEOUT = 40000;
+
+ public static final int CONNECTION_TIMEOUT = 30000;
+
+ private final UnsafeCoordinator unsafeCoordinator = new UnsafeCoordinator();
+
+ private volatile CustomZooKeeper zk = null;
+
+ private volatile FragileClientCnxnSocketNIO socket = null;
+
+ private volatile CustomClientCnxn cnxn = null;
+
+ private String getCxnString(int[] clientPorts) {
+ StringBuffer hostPortBuffer = new StringBuffer();
+ for (int i = 0; i < clientPorts.length; i++) {
+ hostPortBuffer.append("127.0.0.1:");
+ hostPortBuffer.append(clientPorts[i]);
+ if (i != (clientPorts.length - 1)) {
+ hostPortBuffer.append(',');
+ }
+ }
+ return hostPortBuffer.toString();
+ }
+
+ private void closeZookeeper(ZooKeeper zk) {
+ Executors.newSingleThreadExecutor().submit(() -> {
+ try {
+ LOG.info("closeZookeeper is fired");
+ zk.close();
+ } catch (InterruptedException e) {
+ }
+ });
+ }
+
+ @Test
+ public void testClientCnxnSocketFragility() throws Exception {
+ System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
+ FragileClientCnxnSocketNIO.class.getName());
+ System.setProperty(ZKClientConfig.ZOOKEEPER_REQUEST_TIMEOUT, "1000");
+ final int[] clientPorts = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":"
+ + PortAssignment.unique() + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+ MainThread[] mt = new MainThread[SERVER_COUNT];
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false);
+ mt[i].start();
+ }
+
+ // Ensure server started
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+ }
+ String path = "/testClientCnxnSocketFragility";
+ String data = "balabala";
+ ClientWatcher watcher = new ClientWatcher();
+ zk = new CustomZooKeeper(getCxnString(clientPorts), SESSION_TIMEOUT, watcher);
+ watcher.watchFor(zk);
+
+ // Let's see some successful operations
+ zk.create(path, data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ Assert.assertEquals(new String(zk.getData(path, false, new Stat())), data);
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ // Let's make a broken operation
+ socket.mute();
+ boolean catchKeeperException = false;
+ try {
+ zk.getData(path, false, new Stat());
+ } catch (KeeperException e) {
+ catchKeeperException = true;
+ Assert.assertFalse(e instanceof KeeperException.SessionExpiredException);
+ }
+ socket.unmute();
+ Assert.assertTrue(catchKeeperException);
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ GetDataRetryForeverBackgroundTask retryForeverGetData =
+ new GetDataRetryForeverBackgroundTask(zk, path);
+ retryForeverGetData.startTask();
+ // Let's make a broken network
+ socket.mute();
+
+ // Let's attempt to close ZooKeeper
+ cnxn.attemptClose();
+
+ // Wait some time to expect continuous reconnecting.
+ // We try to make reconnecting hit the unsafe region.
+ cnxn.waitUntilHitUnsafeRegion();
+
+ // close zk with timeout 1000 milli seconds
+ closeZookeeper(zk);
+ TimeUnit.MILLISECONDS.sleep(3000);
+
+ // Since we already close zookeeper, we expect that the zk should not be alive.
+ Assert.assertTrue(!zk.isAlive());
+ Assert.assertTrue(!watcher.isSessionExpired());
+
+ retryForeverGetData.syncCloseTask();
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i].shutdown();
+ }
+ }
+
+ class GetDataRetryForeverBackgroundTask extends Thread {
+ private volatile boolean alive;
+ private final CustomZooKeeper zk;
+ private final String path;
+
+ GetDataRetryForeverBackgroundTask(CustomZooKeeper zk, String path) {
+ this.alive = false;
+ this.zk = zk;
+ this.path = path;
+ // marked as daemon to avoid exhausting CPU
+ setDaemon(true);
+ }
+
+ void startTask() {
+ alive = true;
+ start();
+ }
+
+ void syncCloseTask() throws InterruptedException {
+ alive = false;
+ join();
+ }
+
+ @Override
+ public void run() {
+ while (alive) {
+ try {
+ zk.getData(path, false, new Stat());
+ // sleep for a while to avoid exhausting CPU
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (Exception e) {
+ LOG.info("zookeeper getData failed on path {}", path);
+ }
+ }
+ }
+ }
+
+ public static class FragileClientCnxnSocketNIO extends ClientCnxnSocketNIO {
+
+ private volatile boolean mute;
+
+ public FragileClientCnxnSocketNIO(ZKClientConfig clientConfig) throws IOException {
+ super(clientConfig);
+ mute = false;
+ }
+
+ synchronized void mute() {
+ if (!mute) {
+ LOG.info("Fire socket mute");
+ mute = true;
+ }
+ }
+
+ synchronized void unmute() {
+ if (mute) {
+ LOG.info("Fire socket unmute");
+ mute = false;
+ }
+ }
+
+ @Override
+ void doTransport(int waitTimeOut, Queue<Packet> pendingQueue, ClientCnxn cnxn)
+ throws IOException, InterruptedException {
+ if (mute) {
+ throw new IOException("Socket is mute");
+ }
+ super.doTransport(waitTimeOut, pendingQueue, cnxn);
+ }
+
+ @Override
+ void connect(InetSocketAddress addr) throws IOException {
+ if (mute) {
+ throw new IOException("Socket is mute");
+ }
+ super.connect(addr);
+ }
+ }
+
+ class ClientWatcher implements Watcher {
+
+ private ZooKeeper zk;
+
+ private boolean sessionExpired = false;
+
+ void watchFor(ZooKeeper zk) {
+ this.zk = zk;
+ }
+
+ @Override
+ public void process(WatchedEvent event) {
+ LOG.info("Watcher got {}", event);
+ if (event.getState() == KeeperState.Expired) {
+ sessionExpired = true;
+ }
+ }
+
+ boolean isSessionExpired() {
+ return sessionExpired;
+ }
+ }
+
+ // Coordinate to construct the risky scenario.
+ class UnsafeCoordinator {
+
+ private CountDownLatch syncLatch = new CountDownLatch(2);
+
+ void sync(boolean closing) {
+ LOG.info("Attempt to sync with {}", closing);
+ if (closing) {
+ syncLatch.countDown();
+ try {
+ syncLatch.await();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ }
+
+ class CustomClientCnxn extends ClientCnxn {
+
+ private volatile boolean closing = false;
+
+ private volatile boolean hitUnsafeRegion = false;
+
+ public CustomClientCnxn(
+ String chrootPath,
+ HostProvider hostProvider,
+ int sessionTimeout,
+ ZooKeeper zooKeeper,
+ ClientWatchManager watcher,
+ ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ super(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+ }
+
+ void attemptClose() {
+ closing = true;
+ }
+
+ void waitUntilHitUnsafeRegion() {
+ while (!hitUnsafeRegion) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ @Override
+ protected void onConnecting(InetSocketAddress addr) {
+ if (closing) {
+ LOG.info("Attempt to connnecting {} {} {}", addr, closing, state);
+ ///////// Unsafe Region ////////
+ // Slow down and zoom out the unsafe point to make risk
+ // The unsafe point is that startConnect happens after sendThread.close
+ hitUnsafeRegion = true;
+ unsafeCoordinator.sync(closing);
+ ////////////////////////////////
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ Assert.assertTrue(closing);
+ LOG.info("Attempt to disconnecting client for session: 0x{} {} {}", Long.toHexString(getSessionId()), closing, state);
+ sendThread.close();
+ ///////// Unsafe Region ////////
+ unsafeCoordinator.sync(closing);
+ ////////////////////////////////
+ try {
+ sendThread.join();
+ } catch (InterruptedException ex) {
+ LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
+ }
+ eventThread.queueEventOfDeath();
+ if (zooKeeperSaslClient != null) {
+ zooKeeperSaslClient.shutdown();
+ }
+ }
+ }
+
+ class CustomZooKeeper extends ZooKeeper {
+
+ public CustomZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
+ super(connectString, sessionTimeout, watcher);
+ }
+
+ public boolean isAlive() {
+ return cnxn.getState().isAlive();
+ }
+
+ @Override
+ protected ClientCnxn createConnection(
+ String chrootPath,
+ HostProvider hostProvider,
+ int sessionTimeout,
+ ZooKeeper zooKeeper,
+ ClientWatchManager watcher,
+ ClientCnxnSocket clientCnxnSocket,
+ boolean canBeReadOnly) throws IOException {
+ Assert.assertTrue(clientCnxnSocket instanceof FragileClientCnxnSocketNIO);
+ socket = (FragileClientCnxnSocketNIO) clientCnxnSocket;
+ ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, canBeReadOnly);
+ return ClientCnxnSocketFragilityTest.this.cnxn;
+ }
+ }
+}
\ No newline at end of file