You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by cn...@apache.org on 2016/06/23 21:25:35 UTC

svn commit: r1750022 - in /zookeeper/trunk: ./ src/java/main/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/server/quorum/ src/java/test/org/apache/zookeeper/test/

Author: cnauroth
Date: Thu Jun 23 21:25:34 2016
New Revision: 1750022

URL: http://svn.apache.org/viewvc?rev=1750022&view=rev
Log:
ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the leader. (Arshad Mohammad via cnauroth)

Added:
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
Modified:
    zookeeper/trunk/CHANGES.txt
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
    zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
    zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java

Modified: zookeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/trunk/CHANGES.txt?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/CHANGES.txt (original)
+++ zookeeper/trunk/CHANGES.txt Thu Jun 23 21:25:34 2016
@@ -317,6 +317,9 @@ BUGFIXES:
   ZOOKEEPER-2297: NPE is thrown while creating "key manager" and "trust manager"
   (Arshad Mohammad via fpj)
 
+  ZOOKEEPER-2380: Deadlock between leader shutdown and forwarding ACK to the
+  leader (Arshad Mohammad via cnauroth)
+
 IMPROVEMENTS:
   ZOOKEEPER-2024 Major throughput improvement with mixed workloads (Kfir Lev-Ari via shralex)
 

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/Leader.java Thu Jun 23 21:25:34 2016
@@ -549,6 +549,8 @@ public class Leader {
             // We ping twice a tick, so we only update the tick every other
             // iteration
             boolean tickSkip = true;
+            // If not null then shutdown this leader
+            String shutdownMessage = null;
 
             while (true) {
                 synchronized (this) {
@@ -586,12 +588,10 @@ public class Leader {
 
                     if (!tickSkip && !syncedAckSet.hasAllQuorums()) {
                         // Lost quorum of last committed and/or last proposed
-                        // config, shutdown
-                        shutdown("Not sufficient followers synced, only synced with sids: [ "
-                                + syncedAckSet.ackSetsToString() + " ]");
-                        // make sure the order is the same!
-                        // the leader goes to looking
-                        return;
+                        // config, set shutdown flag
+                        shutdownMessage = "Not sufficient followers synced, only synced with sids: [ "
+                                + syncedAckSet.ackSetsToString() + " ]";
+                        break;
                     }
                     tickSkip = !tickSkip;
                 }
@@ -599,6 +599,10 @@ public class Leader {
                     f.ping();
                 }
             }
+            if (shutdownMessage != null) {
+                shutdown(shutdownMessage);
+                // leader goes in looking state
+            }
         } finally {
             zk.unregisterJMX(this);
         }

Modified: zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java (original)
+++ zookeeper/trunk/src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java Thu Jun 23 21:25:34 2016
@@ -1642,7 +1642,11 @@ public class QuorumPeer extends ZooKeepe
     public void setZKDatabase(ZKDatabase database) {
         this.zkDb = database;
     }
-    
+
+    protected ZKDatabase getZkDb() {
+        return zkDb;
+    }
+
     public synchronized void initConfigInZKDatabase() {   
         if (zkDb != null) zkDb.initConfigInZKDatabase(getQuorumVerifier());
     }

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java Thu Jun 23 21:25:34 2016
@@ -34,6 +34,7 @@ import org.apache.zookeeper.WatchedEvent
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.common.PathUtils;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
 import org.apache.zookeeper.server.admin.JettyAdminServer;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.QuorumBase;
@@ -228,11 +229,15 @@ public class QuorumPeerTestBase extends
         Thread currentThread;
 
         synchronized public void start() {
-            main = new TestQPMain();
+            main = getTestQPMain();
             currentThread = new Thread(this);
             currentThread.start();
         }
 
+        public TestQPMain getTestQPMain() {
+            return new TestQPMain();
+        }
+
         public void run() {
             String args[] = new String[1];
             args[0] = confFile.toString();
@@ -280,5 +285,9 @@ public class QuorumPeerTestBase extends
             props.load(new FileReader(confFile));
             return props.getProperty(key, "");
         }
+
+        public QuorumPeer getQuorumPeer() {
+            return main.quorumPeer;
+        }
     }
 }

Added: zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java?rev=1750022&view=auto
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java (added)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/server/quorum/RaceConditionTest.java Thu Jun 23 21:25:34 2016
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.quorum;
+
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.server.FinalRequestProcessor;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.admin.AdminServer.AdminServerException;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This test class contains test cases related to race condition in complete
+ * ZooKeeper
+ */
+public class RaceConditionTest extends QuorumPeerTestBase {
+    protected static final Logger LOG = LoggerFactory.getLogger(RaceConditionTest.class);
+    private static int SERVER_COUNT = 3;
+    private MainThread[] mt;
+
+    /**
+     * Test case for https://issues.apache.org/jira/browse/ZOOKEEPER-2380.
+     * Deadlock while shutting down the ZooKeeper
+     */
+
+    @Test(timeout = 30000)
+    public void testRaceConditionBetweenLeaderAndAckRequestProcessor() throws Exception {
+        mt = startQuorum();
+        // get leader
+        QuorumPeer leader = getLeader(mt);
+        assertNotNull("Leader should not be null", leader);
+        // shutdown 2 followers so that leader does not have majority and goes
+        // into looking state or following state.
+        shutdownFollowers(mt);
+        assertTrue("Leader failed to transition to LOOKING or FOLLOWING state", ClientBase.waitForServerState(leader,
+                15000, QuorumStats.Provider.LOOKING_STATE, QuorumStats.Provider.FOLLOWING_STATE));
+    }
+
+    @After
+    public void tearDown() {
+        // stop all severs
+        if (null != mt) {
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                try {
+                    // With the defect, leader hangs here also, but with fix
+                    // it does not
+                    mt[i].shutdown();
+                } catch (InterruptedException e) {
+                    LOG.warn("Quorum Peer interrupted while shutting it down", e);
+                }
+            }
+        }
+    }
+
+    private MainThread[] startQuorum() throws IOException {
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique() + ":" + PortAssignment.unique()
+                    + ":participant;127.0.0.1:" + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+        MainThread mt[] = new MainThread[SERVER_COUNT];
+
+        // start all the servers
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection, false) {
+                @Override
+                public TestQPMain getTestQPMain() {
+                    return new MockTestQPMain();
+                }
+            };
+            mt[i].start();
+        }
+
+        // ensure all servers started
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            Assert.assertTrue("waiting for server " + i + " being up",
+                    ClientBase.waitForServerUp("127.0.0.1:" + clientPorts[i], CONNECTION_TIMEOUT));
+        }
+        return mt;
+    }
+
+    private QuorumPeer getLeader(MainThread[] mt) {
+        for (int i = mt.length - 1; i >= 0; i--) {
+            QuorumPeer quorumPeer = mt[i].getQuorumPeer();
+            if (quorumPeer != null && ServerState.LEADING == quorumPeer.getPeerState()) {
+                return quorumPeer;
+            }
+        }
+        return null;
+    }
+
+    private void shutdownFollowers(MainThread[] mt) {
+        for (int i = 0; i < mt.length; i++) {
+            CustomQuorumPeer quorumPeer = (CustomQuorumPeer) mt[i].getQuorumPeer();
+            if (quorumPeer != null && ServerState.FOLLOWING == quorumPeer.getPeerState()) {
+                quorumPeer.setStopPing(true);
+            }
+        }
+    }
+
+    private static class CustomQuorumPeer extends QuorumPeer {
+        private boolean stopPing;
+
+        public void setStopPing(boolean stopPing) {
+            this.stopPing = stopPing;
+        }
+
+        public CustomQuorumPeer(Map<Long, QuorumServer> quorumPeers, File snapDir, File logDir, int clientPort,
+                int electionAlg, long myid, int tickTime, int initLimit, int syncLimit) throws IOException {
+            super(quorumPeers, snapDir, logDir, electionAlg, myid, tickTime, initLimit, syncLimit, false,
+                    ServerCnxnFactory.createFactory(new InetSocketAddress(clientPort), -1), new QuorumMaj(quorumPeers));
+        }
+
+        @Override
+        protected Follower makeFollower(FileTxnSnapLog logFactory) throws IOException {
+
+            return new Follower(this, new FollowerZooKeeperServer(logFactory, this, this.getZkDb())) {
+                @Override
+                protected void processPacket(QuorumPacket qp) throws Exception {
+                    if (stopPing && qp.getType() == Leader.PING) {
+                        LOG.info("Follower skipped ping");
+                        throw new SocketException("Socket time out while sending the ping response");
+                    } else {
+                        super.processPacket(qp);
+                    }
+                }
+            };
+        }
+
+        @Override
+        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+            LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, this, this.getZkDb()) {
+                @Override
+                protected void setupRequestProcessors() {
+                    /**
+                     * This method is overridden to make a place to inject
+                     * MockSyncRequestProcessor
+                     */
+                    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
+                    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor,
+                            getLeader());
+                    commitProcessor = new CommitProcessor(toBeAppliedProcessor, Long.toString(getServerId()), false,
+                            getZooKeeperServerListener());
+                    commitProcessor.start();
+                    ProposalRequestProcessor proposalProcessor = new MockProposalRequestProcessor(this,
+                            commitProcessor);
+                    proposalProcessor.initialize();
+                    prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);
+                    prepRequestProcessor.start();
+                    firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);
+                }
+
+            };
+            return new Leader(this, zk);
+        }
+    }
+
+    private static class MockSyncRequestProcessor extends SyncRequestProcessor {
+
+        public MockSyncRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
+            super(zks, nextProcessor);
+        }
+
+        @Override
+        public void shutdown() {
+            /**
+             * Add a request so that something is there for SyncRequestProcessor
+             * to process, while we are in shutdown flow
+             */
+            Request request = new Request(null, 0, 0, ZooDefs.OpCode.delete,
+                    ByteBuffer.wrap("/deadLockIssue".getBytes()), null);
+            processRequest(request);
+            super.shutdown();
+        }
+    }
+
+    private static class MockProposalRequestProcessor extends ProposalRequestProcessor {
+        public MockProposalRequestProcessor(LeaderZooKeeperServer zks, RequestProcessor nextProcessor) {
+            super(zks, nextProcessor);
+
+            /**
+             * The only purpose here is to inject the mocked
+             * SyncRequestProcessor
+             */
+            AckRequestProcessor ackProcessor = new AckRequestProcessor(zks.getLeader());
+            syncProcessor = new MockSyncRequestProcessor(zks, ackProcessor);
+        }
+    }
+
+    private static class MockTestQPMain extends TestQPMain {
+        @Override
+        public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
+            quorumPeer = new CustomQuorumPeer(config.getQuorumVerifier().getAllMembers(), config.getDataDir(),
+                    config.getDataLogDir(), config.getClientPortAddress().getPort(), config.getElectionAlg(),
+                    config.getServerId(), config.getTickTime(), config.getInitLimit(), config.getSyncLimit());
+            quorumPeer.start();
+            try {
+                quorumPeer.join();
+            } catch (InterruptedException e) {
+                LOG.warn("Quorum Peer interrupted", e);
+            }
+        }
+    }
+}

Modified: zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java
URL: http://svn.apache.org/viewvc/zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java?rev=1750022&r1=1750021&r2=1750022&view=diff
==============================================================================
--- zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java (original)
+++ zookeeper/trunk/src/java/test/org/apache/zookeeper/test/ClientBase.java Thu Jun 23 21:25:34 2016
@@ -290,8 +290,11 @@ public abstract class ClientBase extends
         return false;
     }
 
+    /**
+     * Return true if any of the states is achieved
+     */
     public static boolean waitForServerState(QuorumPeer qp, int timeout,
-            String serverState) {
+            String... serverStates) {
         long start = Time.currentElapsedTime();
         while (true) {
             try {
@@ -299,8 +302,11 @@ public abstract class ClientBase extends
             } catch (InterruptedException e) {
                 // ignore
             }
-            if (qp.getServerState().equals(serverState))
-                return true;
+            for (String state : serverStates) {
+                if (qp.getServerState().equals(state)) {
+                    return true;
+                }
+            }
             if (Time.currentElapsedTime() > start + timeout) {
                 return false;
             }