You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by cn...@apache.org on 2016/06/23 21:25:35 UTC
svn commit: r1750022 - in /zookeeper/trunk: ./
src/java/main/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/server/quorum/
src/java/test/org/apache/zookeeper/test/
Author: cnauroth
Date: Thu Jun 23 21:25:34 2016
New Revision: 1750022
URL: http://svn.apache.org/viewvc?rev=1750022&view=rev
Log:
ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the leader. (Arshad Mohammad via cnauroth)
Added:
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
Modified:
zookeeper/trunk/CHANGES.txt
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jun 23 21:25:34 2016
@@ -317,6 +317,9 @@ BUGFIXES:
ZOOKEEPER-2297: NPE is thrown while creating "key manager" and "trust manager"
(Arshad Mohammad via fpj)
+ ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the
+ leader (Arshad Mohammad via cnauroth)
+
IMPROVEMENTS:
ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jun 23 21:25:34 2016
@@ -549,6 +549,8 @@ public class Leader {
// We ping twice a tick, so we only update the tick every other
// iteration
boolean tickSkip = true;
+ // If not null then shutdown this leader
+ String shutdownMessage = null;
while (true) {
synchronized (this) {
@@ -586,12 +588,10 @@ public class Leader {
if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
// Lost quorum of last committed and/or last proposed
- // config, shutdown
- shutdown("Not sufficient followers synced, only synced with sids: [ "
- + syncedAckSet.ackSetsToString() + " ]");
- // make sure the order is the same!
- // the leader goes to looking
- return;
+ // config, set shutdown flag
+ shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+ + syncedAckSet.ackSetsToString() + " ]";
+ break;
}
tickSkip = !tickSkip;
}
@@ -599,6 +599,10 @@ public class Leader {
f.ping();
}
}
+ if (shutdownMessage != null) {
+ shutdown(shutdownMessage);
+ // leader goes in looking state
+ }
} finally {
zk.unregisterJMX(this);
}
Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jun 23 21:25:34 2016
@@ -1642,7 +1642,11 @@ public class QuorumPeer extends ZooKeepe
public void setZKDatabase(ZKDatabase database) {
this.zkDb = database;
}
-
+
+ protected ZKDatabase getZkDb() {
+ return zkDb;
+ }
+
public synchronized void initConfigInZKDatabase() {
if (zkDb != null) zkDb.initConfigInZKDatabase(getQuorumVerifier());
}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Thu Jun 23 21:25:34 2016
@@ -34,6 +34,7 @@ import org.apache.zookeeper.WatchedEvent
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
import org.apache.zookeeper.server.admin.JettyAdminServer;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.QuorumBase;
@@ -228,11 +229,15 @@ public class QuorumPeerTestBase extends
Thread currentThread;
synchronized public void start() {
- main = new TestQPMain();
+ main = getTestQPMain();
currentThread = new Thread(this);
currentThread.start();
}
+ public TestQPMain getTestQPMain() {
+ return new TestQPMain();
+ }
+
public void run() {
String args[] = new String[1];
args[0] = confFile.toString();
@@ -280,5 +285,9 @@ public class QuorumPeerTestBase extends
props.load(new FileReader(confFile));
return props.getProperty(key, "");
}
+
+ public QuorumPeer getQuorumPeer() {
+ return main.quorumPeer;
+ }
}
}
Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java?rev=1750022&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java Thu Jun 23 21:25:34 2016
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test class contains test cases related to race condition in complete
+ * ZooKeeper
+ */
+public class RaceConditionTest extends QuorumPeerTestBase {
+ protected static final Logger LOG = LoggerFactory.getLogger(RaceConditionTest.class);
+ private static int SERVER_COUNT = 3;
+ private MainThread[] mt;
+
+ /**
+ * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2380.
+ * Deadlock while shutting down the ZooKeeper
+ */
+
+ @Test(timeout = 30000)
+ public void testRaceConditionBetweenLeaderAndAckRequestProcessor() throws Exception {
+ mt = startQuorum();
+ // get leader
+ QuorumPeer leader = getLeader(mt);
+ assertNotNull("Leader should not be null", leader);
+ // shutdown 2 followers so that leader does not have majority and goes
+ // into looking state or following state.
+ shutdownFollowers(mt);
+ assertTrue("Leader failed to transition to LOOKING or FOLLOWING state", ClientBase.waitForServerState(leader,
+ 15000, QuorumStats.Provider.LOOKING_STATE, QuorumStats.Provider.FOLLOWING_STATE));
+ }
+
+ @After
+ public void tearDown() {
+ // stop all severs
+ if (null != mt) {
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ try {
+ // With the defect, leader hangs here also, but with fix
+ // it does not
+ mt[i].shutdown();
+ } catch (InterruptedException e) {
+ LOG.warn("Quorum Peer interrupted while shutting it down", e);
+ }
+ }
+ }
+ }
+
+ private MainThread[] startQuorum() throws IOException {
+ final int clientPorts[] = new int[SERVER_COUNT];
+ StringBuilder sb = new StringBuilder();
+ String server;
+
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ clientPorts[i] = PortAssignment.unique();
+ server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+ + ":participant;127.0.0.1:" + clientPorts[i];
+ sb.append(server + "\n");
+ }
+ String currentQuorumCfgSection = sb.toString();
+ MainThread mt[] = new MainThread[SERVER_COUNT];
+
+ // start all the servers
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+ @Override
+ public TestQPMain getTestQPMain() {
+ return new MockTestQPMain();
+ }
+ };
+ mt[i].start();
+ }
+
+ // ensure all servers started
+ for (int i = 0; i < SERVER_COUNT; i++) {
+ Assert.assertTrue("waiting for server " + i + " being up",
+ ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+ }
+ return mt;
+ }
+
+ private QuorumPeer getLeader(MainThread[] mt) {
+ for (int i = mt.length - 1; i >= 0; i--) {
+ QuorumPeer quorumPeer = mt[i].getQuorumPeer();
+ if (quorumPeer != null && ServerState.LEADING == quorumPeer.getPeerState()) {
+ return quorumPeer;
+ }
+ }
+ return null;
+ }
+
+ private void shutdownFollowers(MainThread[] mt) {
+ for (int i = 0; i < mt.length; i++) {
+ CustomQuorumPeer quorumPeer = (CustomQuorumPeer) mt[i].getQuorumPeer();
+ if (quorumPeer != null && ServerState.FOLLOWING == quorumPeer.getPeerState()) {
+ quorumPeer.setStopPing(true);
+ }
+ }
+ }
+
+ private static class CustomQuorumPeer extends QuorumPeer {
+ private boolean stopPing;
+
+ public void setStopPing(boolean stopPing) {
+ this.stopPing = stopPing;
+ }
+
+ public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
+ int electionAlg, long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
+ super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+ ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
+ }
+
+ @Override
+ protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+
+ return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
+ @Override
+ protected void processPacket(QuorumPacket qp) throws Exception {
+ if (stopPing && qp.getType() == Leader.PING) {
+ LOG.info("Follower skipped ping");
+ throw new SocketException("Socket time out while sending the ping response");
+ } else {
+ super.processPacket(qp);
+ }
+ }
+ };
+ }
+
+ @Override
+ protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+ LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, this, this.getZkDb()) {
+ @Override
+ protected void setupRequestProcessors() {
+ /**
+ * This method is overridden to make a place to inject
+ * MockSyncRequestProcessor
+ */
+ RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+ RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor,
+ getLeader());
+ commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false,
+ getZooKeeperServerListener());
+ commitProcessor.start();
+ ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this,
+ commitProcessor);
+ proposalProcessor.initialize();
+ prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+ prepRequestProcessor.start();
+ firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
+ }
+
+ };
+ return new Leader(this, zk);
+ }
+ }
+
+ private static class MockSyncRequestProcessor extends SyncRequestProcessor {
+
+ public MockSyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super(zks, nextProcessor);
+ }
+
+ @Override
+ public void shutdown() {
+ /**
+ * Add a request so that something is there for SyncRequestProcessor
+ * to process, while we are in shutdown flow
+ */
+ Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete,
+ ByteBuffer.wrap("/deadLockIssue".getBytes()), null);
+ processRequest(request);
+ super.shutdown();
+ }
+ }
+
+ private static class MockProposalRequestProcessor extends ProposalRequestProcessor {
+ public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
+ super(zks, nextProcessor);
+
+ /**
+ * The only purpose here is to inject the mocked
+ * SyncRequestProcessor
+ */
+ AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
+ syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor);
+ }
+ }
+
+ private static class MockTestQPMain extends TestQPMain {
+ @Override
+ public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
+ quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
+ config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
+ config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+ quorumPeer.start();
+ try {
+ quorumPeer.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Quorum Peer interrupted", e);
+ }
+ }
+ }
+}
Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu Jun 23 21:25:34 2016
@@ -290,8 +290,11 @@ public abstract class ClientBase extends
return false;
}
+ /**
+ * Return true if any of the states is achieved
+ */
public static boolean waitForServerState(QuorumPeer qp, int timeout,
- String serverState) {
+ String... serverStates) {
long start = Time.currentElapsedTime();
while (true) {
try {
@@ -299,8 +302,11 @@ public abstract class ClientBase extends
} catch (InterruptedException e) {
// ignore
}
- if (qp.getServerState().equals(serverState))
- return true;
+ for (String state : serverStates) {
+ if (qp.getServerState().equals(state)) {
+ return true;
+ }
+ }
if (Time.currentElapsedTime() > start + timeout) {
return false;
}