You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by rg...@apache.org on 2016/01/14 06:21:42 UTC
svn commit: r1724535 - in /zookeeper/branches/branch-3.4: CHANGES.txt
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java
Author: rgs
Date: Thu Jan 14 05:21:41 2016
New Revision: 1724535
URL: http://svn.apache.org/viewvc?rev=1724535&view=rev
Log:
ZOOKEEPER-2347: Deadlock shutting down zookeeper
(Rakesh R via rgs)
Modified:
zookeeper/branches/branch-3.4/CHANGES.txt
zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java
Modified: zookeeper/branches/branch-3.4/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/CHANGES.txt?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/CHANGES.txt (original)
+++ zookeeper/branches/branch-3.4/CHANGES.txt Thu Jan 14 05:21:41 2016
@@ -23,6 +23,9 @@ BUGFIXES:
ZOOKEEPER-2229: Several four-letter words are undocumented
(Chris Nauroth via rgs)
+ ZOOKEEPER-2347: Deadlock shutting down zookeeper
+ (Rakesh R via rgs)
+
Release 3.4.7 - 2015-11-08
Backward compatible changes:
Modified: zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java (original)
+++ zookeeper/branches/branch-3.4/src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java Thu Jan 14 05:21:41 2016
@@ -30,6 +30,8 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslException;
@@ -107,7 +109,7 @@ public class ZooKeeperServer implements
protected SessionTracker sessionTracker;
private FileTxnSnapLog txnLogFactory = null;
private ZKDatabase zkDb;
- protected long hzxid = 0;
+ private final AtomicLong hzxid = new AtomicLong(0);
public final static Exception ok = new Exception("No prob");
protected RequestProcessor firstProcessor;
protected volatile State state = State.INITIAL;
@@ -122,7 +124,7 @@ public class ZooKeeperServer implements
*/
static final private long superSecret = 0XB3415C00L;
- int requestsInProcess;
+ private final AtomicInteger requestsInProcess = new AtomicInteger(0);
final List<ChangeRecord> outstandingChanges = new ArrayList<ChangeRecord>();
// this data structure must be accessed under the outstandingChanges lock
final HashMap<String, ChangeRecord> outstandingChangesForPath =
@@ -308,16 +310,16 @@ public class ZooKeeperServer implements
/**
* This should be called from a synchronized block on this!
*/
- synchronized public long getZxid() {
- return hzxid;
+ public long getZxid() {
+ return hzxid.get();
}
- synchronized long getNextZxid() {
- return ++hzxid;
+ long getNextZxid() {
+ return hzxid.incrementAndGet();
}
- synchronized public void setZxid(long zxid) {
- hzxid = zxid;
+ public void setZxid(long zxid) {
+ hzxid.set(zxid);
}
long getTime() {
@@ -504,16 +506,16 @@ public class ZooKeeperServer implements
jmxDataTreeBean = null;
}
- synchronized public void incInProcess() {
- requestsInProcess++;
+ public void incInProcess() {
+ requestsInProcess.incrementAndGet();
}
- synchronized public void decInProcess() {
- requestsInProcess--;
+ public void decInProcess() {
+ requestsInProcess.decrementAndGet();
}
public int getInProcess() {
- return requestsInProcess;
+ return requestsInProcess.get();
}
/**
Modified: zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java
URL: http://svn.apache.org/viewvc/zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java?rev=1724535&r1=1724534&r2=1724535&view=diff
==============================================================================
--- zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java (original)
+++ zookeeper/branches/branch-3.4/src/java/test/org/apache/zookeeper/server/ZooKeeperServerMainTest.java Thu Jan 14 05:21:41 2016
@@ -23,17 +23,29 @@ import static org.apache.zookeeper.test.
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.jute.Record;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.PortAssignment;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZKTestCase;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.Ids;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.txn.SetDataTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.jboss.netty.channel.Channel;
import org.junit.Assert;
import org.junit.Test;
@@ -238,6 +250,188 @@ public class ZooKeeperServerMainTest ext
}
}
+ /**
+ * Test case to verify that ZooKeeper server is able to shutdown properly
+ * when there are pending request(s) in the RequestProcessor chain.
+ *
+ * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-2347}
+ */
+ @Test(timeout = 30000)
+ public void testRaceBetweenSyncFlushAndZKShutdown() throws Exception {
+ File tmpDir = ClientBase.createTmpDir();
+ File testDir = File.createTempFile("test", ".dir", tmpDir);
+ testDir.delete();
+
+ // Following are the sequence of steps to simulate the deadlock
+ // situation - SyncRequestProcessor#shutdown holds a lock and waits on
+ // FinalRequestProcessor to complete a pending operation, which in turn
+ // also needs the ZooKeeperServer lock
+
+ // 1. start zk server
+ FileTxnSnapLog ftsl = new FileTxnSnapLog(testDir, testDir);
+ final SimpleZooKeeperServer zkServer = new SimpleZooKeeperServer(ftsl);
+ zkServer.startup();
+ // 2. Wait for setting up request processor chain. At the end of setup,
+ // it will add a mock request into the chain
+ // 3. Also, waiting for FinalRequestProcessor to start processing request
+ zkServer.waitForFinalProcessRequest();
+ // 4. Above step ensures that there is a request in the processor chain.
+ // Now invoke shutdown, which will acquire zks lock
+ Thread shutdownThread = new Thread() {
+ public void run() {
+ zkServer.shutdown();
+ };
+ };
+ shutdownThread.start();
+ // 5. Wait for SyncRequestProcessor to trigger shutdown function.
+ // This is to ensure that zks lock is acquired
+ zkServer.waitForSyncReqProcessorShutdown();
+ // 6. Now resume FinalRequestProcessor which in turn call
+ // zks#decInProcess() function and tries to acquire zks lock.
+ // This results in deadlock
+ zkServer.resumeFinalProcessRequest();
+ // 7. Waiting to finish server shutdown. Testing that
+ // SyncRequestProcessor#shutdown holds a lock and waits on
+ // FinalRequestProcessor to complete a pending operation, which in turn
+ // also needs the ZooKeeperServer lock
+ shutdownThread.join();
+ }
+
+ private class SimpleZooKeeperServer extends ZooKeeperServer {
+ private SimpleSyncRequestProcessor syncProcessor;
+ private SimpleFinalRequestProcessor finalProcessor;
+
+ SimpleZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
+ super(ftsl, 2000, 2000, 4000, null, new ZKDatabase(ftsl));
+ }
+
+ @Override
+ protected void setupRequestProcessors() {
+ finalProcessor = new SimpleFinalRequestProcessor(this);
+ syncProcessor = new SimpleSyncRequestProcessor(this,
+ finalProcessor);
+ syncProcessor.start();
+ firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+ ((PrepRequestProcessor) firstProcessor).start();
+
+ // add request to the chain
+ addRequestToSyncProcessor();
+ }
+
+ private void addRequestToSyncProcessor() {
+ long zxid = ZxidUtils.makeZxid(3, 7);
+ TxnHeader hdr = new TxnHeader(1, 1, zxid, 1,
+ ZooDefs.OpCode.setData);
+ Record txn = new SetDataTxn("/foo" + zxid, new byte[0], 1);
+ byte[] buf;
+ try {
+ buf = Util.marshallTxnEntry(hdr, txn);
+ } catch (IOException e) {
+ LOG.error("IOException while adding request to SyncRequestProcessor", e);
+ Assert.fail("IOException while adding request to SyncRequestProcessor!");
+ return;
+ }
+ NettyServerCnxnFactory factory = new NettyServerCnxnFactory();
+ final MockNettyServerCnxn nettyCnxn = new MockNettyServerCnxn(null,
+ this, factory);
+ Request req = new Request(nettyCnxn, 1, 1, ZooDefs.OpCode.setData,
+ ByteBuffer.wrap(buf), null);
+ req.hdr = hdr;
+ req.txn = txn;
+ syncProcessor.processRequest(req);
+ }
+
+ void waitForFinalProcessRequest() throws InterruptedException {
+ Assert.assertTrue("Waiting for FinalRequestProcessor to start processing request",
+ finalProcessor.waitForProcessRequestToBeCalled());
+ }
+
+ void waitForSyncReqProcessorShutdown() throws InterruptedException {
+ Assert.assertTrue("Waiting for SyncRequestProcessor to shut down",
+ syncProcessor.waitForShutdownToBeCalled());
+ }
+
+ void resumeFinalProcessRequest() throws InterruptedException {
+ finalProcessor.resumeProcessRequest();
+ }
+ }
+
+ private class MockNettyServerCnxn extends NettyServerCnxn {
+ public MockNettyServerCnxn(Channel channel, ZooKeeperServer zks,
+ NettyServerCnxnFactory factory) {
+ super(null, null, factory);
+ }
+
+ @Override
+ protected synchronized void updateStatsForResponse(long cxid, long zxid,
+ String op, long start, long end) {
+ return;
+ }
+
+ @Override
+ public synchronized void sendResponse(ReplyHeader h, Record r,
+ String tag) {
+ return;
+ }
+ }
+
+ private class SimpleFinalRequestProcessor extends FinalRequestProcessor {
+ private CountDownLatch finalReqProcessCalled = new CountDownLatch(1);
+ private CountDownLatch resumeFinalReqProcess = new CountDownLatch(1);
+ private volatile boolean interrupted = false;
+ public SimpleFinalRequestProcessor(ZooKeeperServer zks) {
+ super(zks);
+ }
+
+ @Override
+ public void processRequest(Request request) {
+ finalReqProcessCalled.countDown();
+ try {
+ resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting to process request", e);
+ interrupted = true; // Marked as interrupted
+ resumeFinalReqProcess.countDown();
+ return;
+ }
+ super.processRequest(request);
+ }
+
+ boolean waitForProcessRequestToBeCalled() throws InterruptedException {
+ return finalReqProcessCalled.await(ClientBase.CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ }
+
+ void resumeProcessRequest() throws InterruptedException {
+ resumeFinalReqProcess.countDown();
+ resumeFinalReqProcess.await(ClientBase.CONNECTION_TIMEOUT,
+ TimeUnit.MILLISECONDS);
+ Assert.assertFalse("Interrupted while waiting to process request",
+ interrupted);
+ }
+ }
+
+ private class SimpleSyncRequestProcessor extends SyncRequestProcessor {
+ private final CountDownLatch shutdownCalled = new CountDownLatch(1);
+
+ public SimpleSyncRequestProcessor(ZooKeeperServer zks,
+ RequestProcessor nextProcessor) {
+ super(zks, nextProcessor);
+ }
+
+ @Override
+ public void shutdown() {
+ shutdownCalled.countDown();
+ super.shutdown();
+ }
+
+ boolean waitForShutdownToBeCalled() throws InterruptedException {
+ return shutdownCalled.await(ClientBase.CONNECTION_TIMEOUT / 3,
+ TimeUnit.MILLISECONDS);
+ }
+ }
+
private void deleteFile(File f) throws IOException {
if (f.isDirectory()) {
for (File c : f.listFiles())