You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by an...@apache.org on 2019/10/09 14:49:09 UTC
[zookeeper] branch master updated: ZOOKEEPER-3471: Fix potential
lock unavailable due to dangling ephemeral nodes left during local session
upgrading
This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 4951a09 ZOOKEEPER-3471: Fix potential lock unavailable due to dangling ephemeral nodes left during local session upgrading
4951a09 is described below
commit 4951a090d7c946f57ac5ab09b5d48a5d7831001d
Author: Fangmin Lyu <fa...@apache.org>
AuthorDate: Wed Oct 9 16:49:02 2019 +0200
ZOOKEEPER-3471: Fix potential lock unavailable due to dangling ephemeral nodes left during local session upgrading
Author: Fangmin Lyu <fa...@apache.org>
Reviewers: andor@apache.org
Closes #1025 from lvfangmin/ZOOKEEPER-3471
---
.../apache/zookeeper/server/ZooKeeperServer.java | 6 +-
.../server/quorum/SessionUpgradeQuorumTest.java | 95 +++++++++++++++++++++-
2 files changed, 95 insertions(+), 6 deletions(-)
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 3d6c375..48e427b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -554,7 +554,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
private void close(long sessionId) {
Request si = new Request(null, sessionId, 0, OpCode.closeSession, null, null);
- setLocalSessionFlag(si);
submitRequest(si);
}
@@ -915,7 +914,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
to.putInt(timeout);
cnxn.setSessionId(sessionId);
Request si = new Request(cnxn, sessionId, 0, OpCode.createSession, to, null);
- setLocalSessionFlag(si);
submitRequest(si);
return sessionId;
}
@@ -1072,6 +1070,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type);
if (validpacket) {
+ setLocalSessionFlag(si);
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
@@ -1574,9 +1573,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
si.setLargeRequestSize(length);
}
si.setOwner(ServerCnxn.me);
- // Always treat packet from the client as a possible
- // local request.
- setLocalSessionFlag(si);
submitRequest(si);
}
}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
index 7fc2f42..0be68ec 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java
@@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import javax.security.sasl.SaslException;
import org.apache.jute.BinaryOutputArchive;
+import org.apache.zookeeper.AsyncCallback.StringCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.PortAssignment;
@@ -195,22 +196,114 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
zk.close();
}
+ @Test
+ public void testCloseSessionWhileUpgradeOnLeader()
+ throws IOException, KeeperException, InterruptedException {
+ int leaderId = -1;
+ for (int i = SERVER_COUNT - 1; i >= 0; i--) {
+ if (mt[i].main.quorumPeer.leader != null) {
+ leaderId = i;
+ }
+ }
+ if (leaderId > 0) {
+ makeSureEphemeralIsGone(leaderId);
+ }
+ }
+
+ @Test
+ public void testCloseSessionWhileUpgradeOnLearner()
+ throws IOException, KeeperException, InterruptedException {
+ int learnerId = -1;
+ for (int i = SERVER_COUNT - 1; i >= 0; i--) {
+ if (mt[i].main.quorumPeer.follower != null) {
+ learnerId = i;
+ }
+ }
+ if (learnerId > 0) {
+ makeSureEphemeralIsGone(learnerId);
+ }
+ }
+
+ private void makeSureEphemeralIsGone(int sid)
+ throws IOException, KeeperException, InterruptedException {
+ // Delay submit request to simulate the request queued in
+ // RequestThrottler
+ qpMain[sid].setSubmitDelayMs(200);
+
+ // Create a client and an ephemeral node
+ ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ waitForOne(zk, States.CONNECTED);
+
+ final String node = "/node-1";
+ zk.create(node, new byte[2], ZooDefs.Ids.OPEN_ACL_UNSAFE,
+ CreateMode.EPHEMERAL, new StringCallback() {
+ @Override
+ public void processResult(int rc, String path, Object ctx,
+ String name) {}
+ }, null);
+
+ // close the client
+ zk.close();
+
+ // make sure the ephemeral is gone
+ zk = new ZooKeeper("127.0.0.1:" + clientPorts[sid],
+ ClientBase.CONNECTION_TIMEOUT, this);
+ waitForOne(zk, States.CONNECTED);
+ assertNull(zk.exists(node, false));
+ zk.close();
+ }
+
private static class TestQPMainDropSessionUpgrading extends TestQPMain {
private volatile boolean shouldDrop = false;
+ private volatile int submitDelayMs = 0;
public void setDropCreateSession(boolean dropCreateSession) {
shouldDrop = dropCreateSession;
}
+ public void setSubmitDelayMs(int delay) {
+ this.submitDelayMs = delay;
+ }
+
@Override
protected QuorumPeer getQuorumPeer() throws SaslException {
return new QuorumPeer() {
@Override
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+ return new Leader(this, new LeaderZooKeeperServer(
+ logFactory, this, this.getZkDb()) {
+
+ @Override
+ public void submitRequestNow(Request si) {
+ if (submitDelayMs > 0) {
+ try {
+ Thread.sleep(submitDelayMs);
+ } catch (Exception e) {}
+ }
+ super.submitRequestNow(si);
+ }
+ });
+ }
+
+ @Override
protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
- return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
+ return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb()) {
+
+ @Override
+ public void submitRequestNow(Request si) {
+ if (submitDelayMs > 0) {
+ try {
+ Thread.sleep(submitDelayMs);
+ } catch (Exception e) {}
+ }
+ super.submitRequestNow(si);
+ }
+
+ }) {
@Override
protected void request(Request request) throws IOException {