You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/07/29 02:16:09 UTC
[1/3] git commit: CURATOR-126: Fix race condition in
CuratorFrameworkImpl.close()
Repository: curator
Updated Branches:
refs/heads/master 15a0aacec -> ccac7baac
CURATOR-126: Fix race condition in CuratorFrameworkImpl.close()
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/785e9f6c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/785e9f6c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/785e9f6c
Branch: refs/heads/master
Commit: 785e9f6c8a528d0c07652450471dcb71a5717776
Parents: 15a0aac
Author: Scott Blum <sc...@squareup.com>
Authored: Mon Jul 28 14:10:37 2014 -0400
Committer: Scott Blum <sc...@squareup.com>
Committed: Mon Jul 28 17:13:17 2014 -0400
----------------------------------------------------------------------
.../framework/CuratorFrameworkFactory.java | 17 +++++++++++++++
.../framework/imps/CuratorFrameworkImpl.java | 22 +++++++++++++++-----
2 files changed, 34 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 2d21fb7..8ef2580 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -52,6 +52,7 @@ public class CuratorFrameworkFactory
private static final DefaultZookeeperFactory DEFAULT_ZOOKEEPER_FACTORY = new DefaultZookeeperFactory();
private static final DefaultACLProvider DEFAULT_ACL_PROVIDER = new DefaultACLProvider();
private static final long DEFAULT_INACTIVE_THRESHOLD_MS = (int)TimeUnit.MINUTES.toMillis(3);
+ private static final int DEFAULT_CLOSE_WAIT_MS = (int)TimeUnit.SECONDS.toMillis(1);
/**
* Return a new builder that builds a CuratorFramework
@@ -101,6 +102,7 @@ public class CuratorFrameworkFactory
private EnsembleProvider ensembleProvider;
private int sessionTimeoutMs = DEFAULT_SESSION_TIMEOUT_MS;
private int connectionTimeoutMs = DEFAULT_CONNECTION_TIMEOUT_MS;
+ private int maxCloseWaitMs = DEFAULT_CLOSE_WAIT_MS;
private RetryPolicy retryPolicy;
private ThreadFactory threadFactory = null;
private String namespace;
@@ -239,6 +241,16 @@ public class CuratorFrameworkFactory
}
/**
+ * @param maxCloseWaitMs time to wait during close to join background threads
+ * @return this
+ */
+ public Builder maxCloseWaitMs(int maxCloseWaitMs)
+ {
+ this.maxCloseWaitMs = maxCloseWaitMs;
+ return this;
+ }
+
+ /**
* @param retryPolicy retry policy to use
* @return this
*/
@@ -336,6 +348,11 @@ public class CuratorFrameworkFactory
return connectionTimeoutMs;
}
+ public int getMaxCloseWaitMs()
+ {
+ return maxCloseWaitMs;
+ }
+
public RetryPolicy getRetryPolicy()
{
return retryPolicy;
http://git-wip-us.apache.org/repos/asf/curator/blob/785e9f6c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 23a3248..7f7cc98 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -63,6 +63,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final ListenerContainer<CuratorListener> listeners;
private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
+ private final int maxCloseWaitMs;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
@@ -127,6 +128,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
backgroundOperations = new DelayQueue<OperationAndData<?>>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
+ maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
@@ -179,6 +181,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
listeners = parent.listeners;
unhandledErrorListeners = parent.unhandledErrorListeners;
threadFactory = parent.threadFactory;
+ maxCloseWaitMs = parent.maxCloseWaitMs;
backgroundOperations = parent.backgroundOperations;
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
@@ -297,15 +300,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
});
+ if ( executorService != null )
+ {
+ executorService.shutdownNow();
+ try
+ {
+ executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
+ }
+ catch ( InterruptedException e )
+ {
+ // Interrupted while interrupting; I give up.
+ Thread.currentThread().interrupt();
+ }
+ }
listeners.clear();
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();
namespaceWatcherMap.close();
- if ( executorService != null )
- {
- executorService.shutdownNow();
- }
}
}
@@ -759,7 +771,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private void backgroundOperationsLoop()
{
- while ( !Thread.interrupted() )
+ while ( !Thread.currentThread().isInterrupted() )
{
OperationAndData<?> operationAndData;
try
[2/3] git commit: CURATOR-126 - Added a unit test to reproduce this
case.
Posted by ra...@apache.org.
CURATOR-126 - Added a unit test to reproduce this case.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/a8a3e147
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/a8a3e147
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/a8a3e147
Branch: refs/heads/master
Commit: a8a3e14755ea69aaa186a40e67173ad7b686d9e3
Parents: 785e9f6
Author: Cameron McKenzie <ca...@unico.com.au>
Authored: Tue Jul 29 09:28:14 2014 +1000
Committer: Cameron McKenzie <ca...@unico.com.au>
Committed: Tue Jul 29 09:28:14 2014 +1000
----------------------------------------------------------------------
.../framework/imps/TestFrameworkBackground.java | 86 ++++++++++++++++++++
1 file changed, 86 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/a8a3e147/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index 44792d9..f9fea4f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -20,12 +20,15 @@
package org.apache.curator.framework.imps;
import com.google.common.collect.Lists;
+
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.OperationAndData.ErrorCallback;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
@@ -35,10 +38,12 @@ import org.apache.curator.test.Timing;
import org.apache.zookeeper.KeeperException.Code;
import org.testng.Assert;
import org.testng.annotations.Test;
+
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -215,4 +220,85 @@ public class TestFrameworkBackground extends BaseClassForTests
}
}
+
+ /**
+ * CURATOR-126
+ * Shutdown the Curator client while there are still background operations running.
+ */
+ @Test
+ public void testShutdown() throws Exception
+ {
+ final int MAX_CLOSE_WAIT_MS = 5000;
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).
+ connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)).maxCloseWaitMs(MAX_CLOSE_WAIT_MS).build();
+ try
+ {
+ client.start();
+
+ BackgroundCallback callback = new BackgroundCallback()
+ {
+ @Override
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
+ {
+ }
+ };
+
+ final CountDownLatch operationReadyLatch = new CountDownLatch(1);
+
+ //This gets called just before the operation is run.
+ ((CuratorFrameworkImpl)client).debugListener = new CuratorFrameworkImpl.DebugBackgroundListener()
+ {
+ @Override
+ public void listen(OperationAndData<?> data)
+ {
+ operationReadyLatch.countDown();
+
+ try {
+ Thread.sleep(MAX_CLOSE_WAIT_MS / 2);
+ } catch(InterruptedException e) {
+ }
+ }
+ };
+
+ Assert.assertTrue(client.getZookeeperClient().blockUntilConnectedOrTimedOut(), "Failed to connect");
+
+ server.stop();
+
+ BackgroundOperation<String> background = new BackgroundOperation<String>()
+ {
+
+ @Override
+ public void performBackgroundOperation(OperationAndData<String> data)
+ throws Exception
+ {
+ }
+ };
+
+ ErrorCallback<String> errorCallback = new ErrorCallback<String>()
+ {
+
+ @Override
+ public void retriesExhausted(
+ OperationAndData<String> operationAndData)
+ {
+ }
+ };
+
+ OperationAndData<String> operation = new OperationAndData<String>(background,
+ "thedata", callback, errorCallback, null);
+
+ ((CuratorFrameworkImpl)client).queueOperation(operation);
+
+ operationReadyLatch.await();
+
+ client.close();
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+
}
[3/3] git commit: Altered test to make it expose problem. Needed to
add debugUnhandledErrorListener for this
Posted by ra...@apache.org.
Altered test to make it expose problem. Needed to add debugUnhandledErrorListener for this
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/ccac7baa
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/ccac7baa
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/ccac7baa
Branch: refs/heads/master
Commit: ccac7baac38b9fe3dc5b322639ea409f7fa0f2b6
Parents: a8a3e14
Author: randgalt <ra...@apache.org>
Authored: Mon Jul 28 19:00:12 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jul 28 19:00:12 2014 -0500
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 8 +-
.../framework/imps/TestFrameworkBackground.java | 111 ++++++++-----------
2 files changed, 53 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/ccac7baa/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 7f7cc98..01cacee 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -57,7 +57,6 @@ import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
{
-
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;
private final ListenerContainer<CuratorListener> listeners;
@@ -86,6 +85,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
volatile DebugBackgroundListener debugListener = null;
+ volatile UnhandledErrorListener debugUnhandledErrorListener = null;
private final AtomicReference<CuratorFrameworkState> state;
@@ -313,6 +313,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
Thread.currentThread().interrupt();
}
}
+
listeners.clear();
unhandledErrorListeners.clear();
connectionStateManager.close();
@@ -566,6 +567,11 @@ public class CuratorFrameworkImpl implements CuratorFramework
return null;
}
});
+
+ if ( debugUnhandledErrorListener != null )
+ {
+ debugUnhandledErrorListener.unhandledError(reason, e);
+ }
}
String unfixForNamespace(String path)
http://git-wip-us.apache.org/repos/asf/curator/blob/ccac7baa/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
index f9fea4f..3f1c41f 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestFrameworkBackground.java
@@ -20,30 +20,26 @@
package org.apache.curator.framework.imps;
import com.google.common.collect.Lists;
-
-import org.apache.curator.test.BaseClassForTests;
-import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.UnhandledErrorListener;
-import org.apache.curator.framework.imps.OperationAndData.ErrorCallback;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.KeeperException.Code;
import org.testng.Assert;
import org.testng.annotations.Test;
-
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -68,7 +64,8 @@ public class TestFrameworkBackground extends BaseClassForTests
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
- if ( firstListenerAction.compareAndSet(true, false) ) {
+ if ( firstListenerAction.compareAndSet(true, false) )
+ {
firstListenerState.set(newState);
System.out.println("First listener state is " + newState);
}
@@ -185,11 +182,7 @@ public class TestFrameworkBackground extends BaseClassForTests
public void testCuratorCallbackOnError() throws Exception
{
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.builder()
- .connectString(server.getConnectString())
- .sessionTimeoutMs(timing.session())
- .connectionTimeoutMs(timing.connection())
- .retryPolicy(new RetryOneTime(1000)).build();
+ CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1000)).build();
final CountDownLatch latch = new CountDownLatch(1);
try
{
@@ -198,8 +191,7 @@ public class TestFrameworkBackground extends BaseClassForTests
{
@Override
- public void processResult(CuratorFramework client, CuratorEvent event)
- throws Exception
+ public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
{
if ( event.getResultCode() == Code.CONNECTIONLOSS.intValue() )
{
@@ -220,7 +212,7 @@ public class TestFrameworkBackground extends BaseClassForTests
}
}
-
+
/**
* CURATOR-126
* Shutdown the Curator client while there are still background operations running.
@@ -228,77 +220,66 @@ public class TestFrameworkBackground extends BaseClassForTests
@Test
public void testShutdown() throws Exception
{
- final int MAX_CLOSE_WAIT_MS = 5000;
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(timing.session()).
- connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1)).maxCloseWaitMs(MAX_CLOSE_WAIT_MS).build();
+ CuratorFramework client = CuratorFrameworkFactory
+ .builder()
+ .connectString(server.getConnectString())
+ .sessionTimeoutMs(timing.session())
+ .connectionTimeoutMs(timing.connection()).retryPolicy(new RetryOneTime(1))
+ .maxCloseWaitMs(timing.forWaiting().milliseconds())
+ .build();
try
{
- client.start();
-
- BackgroundCallback callback = new BackgroundCallback()
+ final AtomicBoolean hadIllegalStateException = new AtomicBoolean(false);
+ ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = new UnhandledErrorListener()
{
@Override
- public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
- {
+ public void unhandledError(String message, Throwable e)
+ {
+ if ( e instanceof IllegalStateException )
+ {
+ hadIllegalStateException.set(true);
+ }
}
};
-
+ client.start();
+
final CountDownLatch operationReadyLatch = new CountDownLatch(1);
-
- //This gets called just before the operation is run.
((CuratorFrameworkImpl)client).debugListener = new CuratorFrameworkImpl.DebugBackgroundListener()
{
@Override
public void listen(OperationAndData<?> data)
{
- operationReadyLatch.countDown();
-
- try {
- Thread.sleep(MAX_CLOSE_WAIT_MS / 2);
- } catch(InterruptedException e) {
+ try
+ {
+ operationReadyLatch.await();
+ }
+ catch ( InterruptedException e )
+ {
+ Thread.currentThread().interrupt();
}
- }
- };
-
- Assert.assertTrue(client.getZookeeperClient().blockUntilConnectedOrTimedOut(), "Failed to connect");
-
- server.stop();
-
- BackgroundOperation<String> background = new BackgroundOperation<String>()
- {
-
- @Override
- public void performBackgroundOperation(OperationAndData<String> data)
- throws Exception
- {
}
};
-
- ErrorCallback<String> errorCallback = new ErrorCallback<String>()
- {
- @Override
- public void retriesExhausted(
- OperationAndData<String> operationAndData)
- {
- }
- };
-
- OperationAndData<String> operation = new OperationAndData<String>(background,
- "thedata", callback, errorCallback, null);
-
- ((CuratorFrameworkImpl)client).queueOperation(operation);
-
- operationReadyLatch.await();
-
+ // queue a background operation that will block due to the debugListener
+ client.create().inBackground().forPath("/hey");
+ timing.sleepABit();
+
+ // close the client while the background is still blocked
client.close();
+
+ // unblock the background
+ operationReadyLatch.countDown();
+ timing.sleepABit();
+
+ // should not generate an exception
+ Assert.assertFalse(hadIllegalStateException.get());
}
finally
{
CloseableUtils.closeQuietly(client);
- }
+ }
}
-
-
+
+
}