You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by rg...@apache.org on 2016/01/14 06:21:42 UTC

svn commit: r1724535 - in /zookeeper/branches/branch-3.4: CHANGES.txt src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java

Author: rgs
Date: Thu Jan 14 05:21:41 2016
New Revision: 1724535

URL: http://svn.apache.org/viewvc?rev=1724535&view=rev
Log:
ZOOKEEPER-2347: Deadlock shutting down zookeeper
(Rakesh R via rgs)

Modified:
    zookeeper/branches/branch-3.4/CHANGES.txt
    zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
    zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java

Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Jan 14 05:21:41 2016
@@ -23,6 +23,9 @@ BUGFIXES:
   ZOOKEEPER-2229: Several four-letter words are undocumented
   (Chris Nauroth via rgs)
 
+  ZOOKEEPER-2347: Deadlock shutting down zookeeper
+  (Rakesh R via rgs)
+
 Release 3.4.7 - 2015-11-08
 
 Backward compatible changes:

Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu Jan 14 05:21:41 2016
@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import javax.security.sasl.SaslException;
 
@@ -107,7 +109,7 @@ public class ZooKeeperServer implements
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
     private ZKDatabase zkDb;
-    protected long hzxid = 0;
+    private final AtomicLong hzxid = new AtomicLong(0);
     public final static Exception ok = new Exception("No prob");
     protected RequestProcessor firstProcessor;
     protected volatile State state = State.INITIAL;
@@ -122,7 +124,7 @@ public class ZooKeeperServer implements
      */
     static final private long superSecret = 0XB3415C00L;
 
-    int requestsInProcess;
+    private final AtomicInteger requestsInProcess = new AtomicInteger(0);
     final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
     // this data structure must be accessed under the outstandingChanges lock
     final HashMap<String, ChangeRecord> outstandingChangesForPath =
@@ -308,16 +310,16 @@ public class ZooKeeperServer implements
     /**
      * This should be called from a synchronized block on this!
      */
-    synchronized public long getZxid() {
-        return hzxid;
+    public long getZxid() {
+        return hzxid.get();
     }
 
-    synchronized long getNextZxid() {
-        return ++hzxid;
+    long getNextZxid() {
+        return hzxid.incrementAndGet();
     }
 
-    synchronized public void setZxid(long zxid) {
-        hzxid = zxid;
+    public void setZxid(long zxid) {
+        hzxid.set(zxid);
     }
 
     long getTime() {
@@ -504,16 +506,16 @@ public class ZooKeeperServer implements
         jmxDataTreeBean = null;
     }
 
-    synchronized public void incInProcess() {
-        requestsInProcess++;
+    public void incInProcess() {
+        requestsInProcess.incrementAndGet();
     }
 
-    synchronized public void decInProcess() {
-        requestsInProcess--;
+    public void decInProcess() {
+        requestsInProcess.decrementAndGet();
     }
 
     public int getInProcess() {
-        return requestsInProcess;
+        return requestsInProcess.get();
     }
 
     /**

Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java Thu Jan 14 05:21:41 2016
@@ -23,17 +23,29 @@ import static org.apache.zookeeper.test.
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.jute.Record;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.jboss.netty.channel.Channel;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -238,6 +250,188 @@ public class ZooKeeperServerMainTest ext
         }
     }
 
+    /**
+     * Test case to verify that ZooKeeper server is able to shutdown properly
+     * when there are pending request(s) in the RequestProcessor chain.
+     *
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-2347}
+     */
+    @Test(timeout = 30000)
+    public void testRaceBetweenSyncFlushAndZKShutdown() throws Exception {
+        File tmpDir = ClientBase.createTmpDir();
+        File testDir = File.createTempFile("test", ".dir", tmpDir);
+        testDir.delete();
+
+        // Following are the sequence of steps to simulate the deadlock
+        // situation - SyncRequestProcessor#shutdown holds a lock and waits on
+        // FinalRequestProcessor to complete a pending operation, which in turn
+        // also needs the ZooKeeperServer lock
+
+        // 1. start zk server
+        FileTxnSnapLog ftsl = new FileTxnSnapLog(testDir, testDir);
+        final SimpleZooKeeperServer zkServer = new SimpleZooKeeperServer(ftsl);
+        zkServer.startup();
+        // 2. Wait for setting up request processor chain. At the end of setup,
+        // it will add a mock request into the chain
+        // 3. Also, waiting for FinalRequestProcessor to start processing request
+        zkServer.waitForFinalProcessRequest();
+        // 4. Above step ensures that there is a request in the processor chain.
+        // Now invoke shutdown, which will acquire zks lock
+        Thread shutdownThread = new Thread() {
+            public void run() {
+                zkServer.shutdown();
+            };
+        };
+        shutdownThread.start();
+        // 5. Wait for SyncRequestProcessor to trigger shutdown function.
+        // This is to ensure that zks lock is acquired
+        zkServer.waitForSyncReqProcessorShutdown();
+        // 6. Now resume FinalRequestProcessor which in turn call
+        // zks#decInProcess() function and tries to acquire zks lock.
+        // This results in deadlock
+        zkServer.resumeFinalProcessRequest();
+        // 7. Waiting to finish server shutdown. Testing that
+        // SyncRequestProcessor#shutdown holds a lock and waits on
+        // FinalRequestProcessor to complete a pending operation, which in turn
+        // also needs the ZooKeeperServer lock
+        shutdownThread.join();
+    }
+
+    private class SimpleZooKeeperServer extends ZooKeeperServer {
+        private SimpleSyncRequestProcessor syncProcessor;
+        private SimpleFinalRequestProcessor finalProcessor;
+
+        SimpleZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
+            super(ftsl, 2000, 2000, 4000, null, new ZKDatabase(ftsl));
+        }
+
+        @Override
+        protected void setupRequestProcessors() {
+            finalProcessor = new SimpleFinalRequestProcessor(this);
+            syncProcessor = new SimpleSyncRequestProcessor(this,
+                    finalProcessor);
+            syncProcessor.start();
+            firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+            ((PrepRequestProcessor) firstProcessor).start();
+
+            // add request to the chain
+            addRequestToSyncProcessor();
+        }
+
+        private void addRequestToSyncProcessor() {
+            long zxid = ZxidUtils.makeZxid(3, 7);
+            TxnHeader hdr = new TxnHeader(1, 1, zxid, 1,
+                    ZooDefs.OpCode.setData);
+            Record txn = new SetDataTxn("/foo" + zxid, new byte[0], 1);
+            byte[] buf;
+            try {
+                buf = Util.marshallTxnEntry(hdr, txn);
+            } catch (IOException e) {
+                LOG.error("IOException while adding request to SyncRequestProcessor", e);
+                Assert.fail("IOException while adding request to SyncRequestProcessor!");
+                return;
+            }
+            NettyServerCnxnFactory factory = new NettyServerCnxnFactory();
+            final MockNettyServerCnxn nettyCnxn = new MockNettyServerCnxn(null,
+                    this, factory);
+            Request req = new Request(nettyCnxn, 1, 1, ZooDefs.OpCode.setData,
+                    ByteBuffer.wrap(buf), null);
+            req.hdr = hdr;
+            req.txn = txn;
+            syncProcessor.processRequest(req);
+        }
+
+        void waitForFinalProcessRequest() throws InterruptedException {
+            Assert.assertTrue("Waiting for FinalRequestProcessor to start processing request",
+                    finalProcessor.waitForProcessRequestToBeCalled());
+        }
+
+        void waitForSyncReqProcessorShutdown() throws InterruptedException {
+            Assert.assertTrue("Waiting for SyncRequestProcessor to shut down",
+                    syncProcessor.waitForShutdownToBeCalled());
+        }
+
+        void resumeFinalProcessRequest() throws InterruptedException {
+            finalProcessor.resumeProcessRequest();
+        }
+    }
+
+    private class MockNettyServerCnxn extends NettyServerCnxn {
+        public MockNettyServerCnxn(Channel channel, ZooKeeperServer zks,
+                NettyServerCnxnFactory factory) {
+            super(null, null, factory);
+        }
+
+        @Override
+        protected synchronized void updateStatsForResponse(long cxid, long zxid,
+                String op, long start, long end) {
+            return;
+        }
+
+        @Override
+        public synchronized void sendResponse(ReplyHeader h, Record r,
+                String tag) {
+            return;
+        }
+    }
+
+    private class SimpleFinalRequestProcessor extends FinalRequestProcessor {
+        private CountDownLatch finalReqProcessCalled = new CountDownLatch(1);
+        private CountDownLatch resumeFinalReqProcess = new CountDownLatch(1);
+        private volatile boolean interrupted = false;
+        public SimpleFinalRequestProcessor(ZooKeeperServer zks) {
+            super(zks);
+        }
+
+        @Override
+        public void processRequest(Request request) {
+            finalReqProcessCalled.countDown();
+            try {
+                resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT,
+                        TimeUnit.MILLISECONDS);
+            } catch (InterruptedException e) {
+                LOG.error("Interrupted while waiting to process request", e);
+                interrupted = true; // Marked as interrupted
+                resumeFinalReqProcess.countDown();
+                return;
+            }
+            super.processRequest(request);
+        }
+
+        boolean waitForProcessRequestToBeCalled() throws InterruptedException {
+            return finalReqProcessCalled.await(ClientBase.CONNECTION_TIMEOUT,
+                    TimeUnit.MILLISECONDS);
+        }
+
+        void resumeProcessRequest() throws InterruptedException {
+            resumeFinalReqProcess.countDown();
+            resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT,
+                    TimeUnit.MILLISECONDS);
+            Assert.assertFalse("Interrupted while waiting to process request",
+                    interrupted);
+        }
+    }
+
+    private class SimpleSyncRequestProcessor extends SyncRequestProcessor {
+        private final CountDownLatch shutdownCalled = new CountDownLatch(1);
+
+        public SimpleSyncRequestProcessor(ZooKeeperServer zks,
+                RequestProcessor nextProcessor) {
+            super(zks, nextProcessor);
+        }
+
+        @Override
+        public void shutdown() {
+            shutdownCalled.countDown();
+            super.shutdown();
+        }
+
+        boolean waitForShutdownToBeCalled() throws InterruptedException {
+            return shutdownCalled.await(ClientBase.CONNECTION_TIMEOUT / 3,
+                    TimeUnit.MILLISECONDS);
+        }
+    }
+
     private void deleteFile(File f) throws IOException {
         if (f.isDirectory()) {
             for (File c : f.listFiles())