You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/16 21:36:34 UTC
[1/2] git commit: Utility to get a name for the thread
Repository: curator
Updated Branches:
refs/heads/CURATOR-110 59076777a -> 04cefb47f
Utility to get a name for the thread
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1c94c7ef
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1c94c7ef
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1c94c7ef
Branch: refs/heads/CURATOR-110
Commit: 1c94c7efa3b5256e14862495055805b4612ca4f4
Parents: 5907677
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:07:24 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:07:24 2014 -0500
----------------------------------------------------------------------
.../org/apache/curator/utils/ThreadUtils.java | 9 +++
.../recipes/AfterConnectionEstablished.java | 73 ++++++++++++++++++++
.../ExecuteAfterConnectionEstablished.java | 73 --------------------
3 files changed, 82 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
----------------------------------------------------------------------
diff --git a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
index f238124..9665dfe 100644
--- a/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
+++ b/curator-client/src/main/java/org/apache/curator/utils/ThreadUtils.java
@@ -53,4 +53,13 @@ public class ThreadUtils
.setDaemon(true)
.build();
}
+
+ public static String getProcessName(Class<?> clazz)
+ {
+ if ( clazz.isAnonymousClass() )
+ {
+ return getProcessName(clazz.getEnclosingClass());
+ }
+ return clazz.getSimpleName();
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
new file mode 100644
index 0000000..f37f7c0
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.framework.recipes;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ThreadUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
+ */
+public class AfterConnectionEstablished
+{
+ private final static Logger log = LoggerFactory.getLogger(AfterConnectionEstablished.class);
+
+ /**
+ * Spawns a new new background thread that will block until a connection is available and
+ * then execute the 'runAfterConnection' logic
+ *
+ * @param client The curator client
+ * @param runAfterConnection The logic to run
+ */
+ public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+ {
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
+ Callable<T> internalCall = new Callable<T>()
+ {
+ @Override
+ public T call() throws Exception
+ {
+ try
+ {
+ client.blockUntilConnected();
+ return runAfterConnection.call();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred blocking until a connection is available", e);
+ throw e;
+ }
+ finally
+ {
+ executor.shutdown();
+ }
+ }
+ };
+ return executor.submit(internalCall).get();
+ }
+
+ private AfterConnectionEstablished()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/1c94c7ef/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
deleted file mode 100644
index 408ed03..0000000
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/ExecuteAfterConnectionEstablished.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.framework.recipes;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ThreadUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-
-/**
- * Utility class to allow execution of logic once a ZooKeeper connection becomes available.
- */
-public class ExecuteAfterConnectionEstablished
-{
- private final static Logger log = LoggerFactory.getLogger(ExecuteAfterConnectionEstablished.class);
-
- /**
- * Spawns a new new background thread that will block until a connection is available and
- * then execute the 'runAfterConnection' logic
- *
- * @param client The curator client
- * @param runAfterConnection The logic to run
- */
- public static <T> T executeAfterConnectionEstablishedInBackground(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
- {
- //Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
- Callable<T> internalCall = new Callable<T>()
- {
- @Override
- public T call() throws Exception
- {
- try
- {
- client.blockUntilConnected();
- return runAfterConnection.call();
- }
- catch ( Exception e )
- {
- log.error("An error occurred blocking until a connection is available", e);
- throw e;
- }
- finally
- {
- executor.shutdown();
- }
- }
- };
- return executor.submit(internalCall).get();
- }
-
- private ExecuteAfterConnectionEstablished()
- {
- }
-}
[2/2] git commit: Moved the connection blocking code into
ConnectionManager. It's cleaner and doesn't require a connection state
listener
Posted by ra...@apache.org.
Moved the connection blocking code into ConnectionManager. It's cleaner and doesn't require a connection state listener
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/04cefb47
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/04cefb47
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/04cefb47
Branch: refs/heads/CURATOR-110
Commit: 04cefb47f18c9d4bd3a0eb897563dd5abb7c89c8
Parents: 1c94c7e
Author: randgalt <ra...@apache.org>
Authored: Mon Jun 16 14:35:54 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Jun 16 14:35:54 2014 -0500
----------------------------------------------------------------------
.../framework/imps/CuratorFrameworkImpl.java | 289 +++++++-----------
.../framework/state/ConnectionStateManager.java | 38 ++-
.../framework/imps/TestBlockUntilConnected.java | 304 +++++++++----------
.../recipes/AfterConnectionEstablished.java | 14 +-
.../framework/recipes/leader/LeaderLatch.java | 21 +-
.../recipes/leader/TestLeaderLatch.java | 46 +--
6 files changed, 336 insertions(+), 376 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 14473d8..23a3248 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -16,11 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.imps;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
-
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
@@ -44,7 +44,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -59,40 +58,40 @@ import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
{
- private final Logger log = LoggerFactory.getLogger(getClass());
- private final CuratorZookeeperClient client;
- private final ListenerContainer<CuratorListener> listeners;
- private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
- private final ThreadFactory threadFactory;
- private final BlockingQueue<OperationAndData<?>> backgroundOperations;
- private final NamespaceImpl namespace;
- private final ConnectionStateManager connectionStateManager;
- private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
- private final byte[] defaultData;
- private final FailedDeleteManager failedDeleteManager;
- private final CompressionProvider compressionProvider;
- private final ACLProvider aclProvider;
- private final NamespaceFacadeCache namespaceFacadeCache;
- private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
- private final Object connectionLock = new Object();
-
- private volatile ExecutorService executorService;
- private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
-
- private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final CuratorZookeeperClient client;
+ private final ListenerContainer<CuratorListener> listeners;
+ private final ListenerContainer<UnhandledErrorListener> unhandledErrorListeners;
+ private final ThreadFactory threadFactory;
+ private final BlockingQueue<OperationAndData<?>> backgroundOperations;
+ private final NamespaceImpl namespace;
+ private final ConnectionStateManager connectionStateManager;
+ private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
+ private final byte[] defaultData;
+ private final FailedDeleteManager failedDeleteManager;
+ private final CompressionProvider compressionProvider;
+ private final ACLProvider aclProvider;
+ private final NamespaceFacadeCache namespaceFacadeCache;
+ private final NamespaceWatcherMap namespaceWatcherMap = new NamespaceWatcherMap(this);
+
+ private volatile ExecutorService executorService;
+ private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
+
+ private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
interface DebugBackgroundListener
{
- void listen(OperationAndData<?> data);
+ void listen(OperationAndData<?> data);
}
- volatile DebugBackgroundListener debugListener = null;
- private final AtomicReference<CuratorFrameworkState> state;
+ volatile DebugBackgroundListener debugListener = null;
+
+ private final AtomicReference<CuratorFrameworkState> state;
private static class AuthInfo
{
- final String scheme;
- final byte[] auth;
+ final String scheme;
+ final byte[] auth;
private AuthInfo(String scheme, byte[] auth)
{
@@ -113,37 +112,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
- this.client = new CuratorZookeeperClient
- (
- localZookeeperFactory,
- builder.getEnsembleProvider(),
- builder.getSessionTimeoutMs(),
- builder.getConnectionTimeoutMs(),
- new Watcher()
+ this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
+ {
+ @Override
+ public void process(WatchedEvent watchedEvent)
{
- @Override
- public void process(WatchedEvent watchedEvent)
- {
- CuratorEvent event = new CuratorEventImpl
- (
- CuratorFrameworkImpl.this,
- CuratorEventType.WATCHED,
- watchedEvent.getState().getIntValue(),
- unfixForNamespace(watchedEvent.getPath()),
- null,
- null,
- null,
- null,
- null,
- watchedEvent,
- null
- );
- processEvent(event);
- }
- },
- builder.getRetryPolicy(),
- builder.canBeReadOnly()
- );
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null);
+ processEvent(event);
+ }
+ }, builder.getRetryPolicy(), builder.canBeReadOnly());
listeners = new ListenerContainer<CuratorListener>();
unhandledErrorListeners = new ListenerContainer<UnhandledErrorListener>();
@@ -155,7 +132,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
- byte[] builderDefaultData = builder.getDefaultData();
+ byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
if ( builder.getAuthScheme() != null )
@@ -165,23 +142,6 @@ public class CuratorFrameworkImpl implements CuratorFramework
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
-
- //Add callback handler to determine connection state transitions
- getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState.isConnected())
- {
- synchronized(connectionLock)
- {
- connectionLock.notifyAll();
- }
- }
- }
- });
}
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -253,7 +213,19 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
- public void start()
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ return connectionStateManager.blockUntilConnected(maxWaitTime, units);
+ }
+
+ @Override
+ public void blockUntilConnected() throws InterruptedException
+ {
+ blockUntilConnected(0, null);
+ }
+
+ @Override
+ public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
@@ -266,7 +238,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
try
{
connectionStateManager.start(); // ordering dependency - must be called before client.start()
-
+
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
@@ -278,16 +250,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
};
-
+
this.getConnectionStateListenable().addListener(listener);
client.start();
executorService = Executors.newFixedThreadPool(2, threadFactory); // 1 for listeners, 1 for background ops
- executorService.submit
- (
- new Callable<Object>()
+ executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
@@ -295,8 +265,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
backgroundOperationsLoop();
return null;
}
- }
- );
+ });
}
catch ( Exception e )
{
@@ -305,31 +274,28 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
- public void close()
+ public void close()
{
log.debug("Closing");
if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
{
- listeners.forEach
- (
- new Function<CuratorListener, Void>()
+ listeners.forEach(new Function<CuratorListener, Void>()
+ {
+ @Override
+ public Void apply(CuratorListener listener)
{
- @Override
- public Void apply(CuratorListener listener)
+ CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
+ try
+ {
+ listener.eventReceived(CuratorFrameworkImpl.this, event);
+ }
+ catch ( Exception e )
{
- CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null);
- try
- {
- listener.eventReceived(CuratorFrameworkImpl.this, event);
- }
- catch ( Exception e )
- {
- log.error("Exception while sending Closing event", e);
- }
- return null;
+ log.error("Exception while sending Closing event", e);
}
+ return null;
}
- );
+ });
listeners.clear();
unhandledErrorListeners.clear();
@@ -515,14 +481,14 @@ public class CuratorFrameworkImpl implements CuratorFramework
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
- boolean isInitialExecution = (event == null);
+ boolean isInitialExecution = (event == null);
if ( isInitialExecution )
{
performBackgroundOperation(operationAndData);
return;
}
- boolean doQueueOperation = false;
+ boolean doQueueOperation = false;
do
{
if ( RetryLoop.shouldRetry(event.getResultCode()) )
@@ -538,7 +504,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
processEvent(event);
- } while ( false );
+ }
+ while ( false );
if ( doQueueOperation )
{
@@ -560,7 +527,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
{
- if ( e instanceof KeeperException.ConnectionLossException || e instanceof CuratorConnectionLossException ) {
+ if ( e instanceof KeeperException.ConnectionLossException )
+ {
if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
{
log.error(reason, e);
@@ -576,27 +544,24 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- final String localReason = reason;
- unhandledErrorListeners.forEach
- (
- new Function<UnhandledErrorListener, Void>()
+ final String localReason = reason;
+ unhandledErrorListeners.forEach(new Function<UnhandledErrorListener, Void>()
+ {
+ @Override
+ public Void apply(UnhandledErrorListener listener)
{
- @Override
- public Void apply(UnhandledErrorListener listener)
- {
- listener.unhandledError(localReason, e);
- return null;
- }
+ listener.unhandledError(localReason, e);
+ return null;
}
- );
+ });
}
- String unfixForNamespace(String path)
+ String unfixForNamespace(String path)
{
return namespace.unfixForNamespace(path);
}
- String fixForNamespace(String path)
+ String fixForNamespace(String path)
{
return namespace.fixForNamespace(path);
}
@@ -723,8 +688,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
sendToBackgroundCallback(operationAndData, event);
}
- KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
- Exception e = null;
+ KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
+ Exception e = null;
try
{
e = (code != null) ? KeeperException.create(code) : null;
@@ -755,7 +720,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
}
- private<DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
+ private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
{
do
{
@@ -788,14 +753,15 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
logError("Background exception was not retry-able or retry gave up", e);
- } while ( false );
+ }
+ while ( false );
}
private void backgroundOperationsLoop()
{
while ( !Thread.interrupted() )
{
- OperationAndData<?> operationAndData;
+ OperationAndData<?> operationAndData;
try
{
operationAndData = backgroundOperations.take();
@@ -850,7 +816,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
else
{
- logError("Background retry gave up", e);
+ logError("Background retry gave up", e);
}
}
else
@@ -867,70 +833,23 @@ public class CuratorFrameworkImpl implements CuratorFramework
validateConnection(curatorEvent.getWatchedEvent().getState());
}
- listeners.forEach
- (
- new Function<CuratorListener, Void>()
+ listeners.forEach(new Function<CuratorListener, Void>()
+ {
+ @Override
+ public Void apply(CuratorListener listener)
{
- @Override
- public Void apply(CuratorListener listener)
+ try
{
- try
- {
- TimeTrace trace = client.startTracer("EventListener");
- listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
- trace.commit();
- }
- catch ( Exception e )
- {
- logError("Event listener threw exception", e);
- }
- return null;
+ TimeTrace trace = client.startTracer("EventListener");
+ listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
+ trace.commit();
}
+ catch ( Exception e )
+ {
+ logError("Event listener threw exception", e);
+ }
+ return null;
}
- );
- }
-
- @Override
- public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
- {
- //Check if we're already connected
- ConnectionState currentConnectionState = connectionStateManager.getCurrentConnectionState();
- if(currentConnectionState != null && currentConnectionState.isConnected())
- {
- return true;
- }
-
- long startTime = System.currentTimeMillis();
- long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
-
- for(;;)
- {
- synchronized(connectionLock)
- {
- currentConnectionState = connectionStateManager.getCurrentConnectionState();
- if(currentConnectionState != null && currentConnectionState.isConnected())
- {
- return true;
- }
-
- long waitTime = 0;
- if(maxWaitTime > 0)
- {
- waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
-
- //Timeout
- if(waitTime <= 0)
- {
- return false;
- }
- }
-
- connectionLock.wait(waitTime);
- }
- }
- }
-
- public void blockUntilConnected() throws InterruptedException {
- blockUntilConnected(0, TimeUnit.SECONDS);
+ });
}
}
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
index ba29994..fb312dc 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/state/ConnectionStateManager.java
@@ -33,6 +33,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -153,6 +154,7 @@ public class ConnectionStateManager implements Closeable
currentConnectionState = ConnectionState.SUSPENDED;
postState(ConnectionState.SUSPENDED);
+
return true;
}
@@ -188,15 +190,45 @@ public class ConnectionStateManager implements Closeable
return true;
}
-
- public synchronized ConnectionState getCurrentConnectionState()
+
+ public synchronized boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ boolean hasMaxWait = (units != null);
+ long startTime = System.currentTimeMillis();
+
+ while ( !isConnected() )
+ {
+ long maxWaitTimeMS = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWaitTime, units) : 0;
+
+ if ( hasMaxWait )
+ {
+ long waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+ if ( waitTime <= 0 )
+ {
+ return isConnected();
+ }
+
+ wait(waitTime);
+ }
+ else
+ {
+ wait();
+ }
+ }
+ return isConnected();
+ }
+
+ public synchronized boolean isConnected()
{
- return currentConnectionState;
+ return (currentConnectionState != null) && currentConnectionState.isConnected();
}
private void postState(ConnectionState state)
{
log.info("State change: " + state);
+
+ notifyAll();
+
while ( !eventQueue.offer(state) )
{
eventQueue.poll();
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
index 8dfb7d8..f649afb 100644
--- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -16,12 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.curator.framework.imps;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
+package org.apache.curator.framework.imps;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -34,202 +30,206 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
public class TestBlockUntilConnected extends BaseClassForTests
{
- /**
- * Test the case where we're already connected
- */
- @Test
- public void testBlockUntilConnectedCurrentlyConnected()
- {
- Timing timing = new Timing();
+ /**
+ * Test the case where we're already connected
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyConnected() throws Exception
+ {
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- final CountDownLatch connectedLatch = new CountDownLatch(1);
- client.getConnectionStateListenable().addListener(new ConnectionStateListener()
- {
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState.isConnected())
- {
- connectedLatch.countDown();
- }
- }
- });
-
- client.start();
-
- Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
- Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState.isConnected() )
+ {
+ connectedLatch.countDown();
+ }
+ }
+ });
+
+ client.start();
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+ Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and never have been
- */
- @Test
- public void testBlockUntilConnectedCurrentlyNeverConnected()
- {
+ }
+
+ /**
+ * Test the case where we are not currently connected and never have been
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyNeverConnected()
+ {
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ client.start();
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected, but have been previously
- */
- @Test
- public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
- {
- Timing timing = new Timing();
+ }
+
+ /**
+ * Test the case where we are not currently connected, but have been previously
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+ {
+ Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
final CountDownLatch lostLatch = new CountDownLatch(1);
client.getConnectionStateListenable().addListener(new ConnectionStateListener()
{
-
- @Override
- public void stateChanged(CuratorFramework client, ConnectionState newState)
- {
- if(newState == ConnectionState.LOST)
- {
- lostLatch.countDown();
- }
- }
- });
-
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if ( newState == ConnectionState.LOST )
+ {
+ lostLatch.countDown();
+ }
+ }
+ });
+
try
{
- client.start();
-
- //Block until we're connected
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
-
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
- //Wait until we hit the lost state
- Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
-
- server = new TestingServer(server.getPort(), server.getTempDirectory());
-
- Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ client.start();
+
+ //Block until we're connected
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ //Wait until we hit the lost state
+ Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
}
- catch(Exception e)
+ catch ( Exception e )
{
- Assert.fail("Unexpected exception " + e);
+ Assert.fail("Unexpected exception " + e);
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and time out before a
- * connection becomes available.
- */
- @Test
- public void testBlockUntilConnectedConnectTimeout()
- {
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
+ }
+
+ /**
+ * Test the case where we are not currently connected and time out before a
+ * connection becomes available.
+ */
+ @Test
+ public void testBlockUntilConnectedConnectTimeout()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
- Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
- "Connected");
+ client.start();
+ Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS), "Connected");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- Assert.fail("Unexpected interruption");
+ Assert.fail("Unexpected interruption");
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
-
- /**
- * Test the case where we are not currently connected and the thread gets interrupted
- * prior to a connection becoming available
- */
- @Test
- public void testBlockUntilConnectedInterrupt()
- {
- //Kill the server
- CloseableUtils.closeQuietly(server);
-
+ }
+
+ /**
+ * Test the case where we are not currently connected and the thread gets interrupted
+ * prior to a connection becoming available
+ */
+ @Test
+ public void testBlockUntilConnectedInterrupt()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
final CuratorFramework client = CuratorFrameworkFactory.builder().
- connectString(server.getConnectString()).
- retryPolicy(new RetryOneTime(1)).
- build();
-
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
try
{
- client.start();
-
- final Thread threadToInterrupt = Thread.currentThread();
-
- Timer timer = new Timer();
- timer.schedule(new TimerTask() {
-
- @Override
- public void run() {
- threadToInterrupt.interrupt();
- }
- }, 3000);
-
- client.blockUntilConnected(5, TimeUnit.SECONDS);
- Assert.fail("Expected interruption did not occur");
+ client.start();
+
+ final Thread threadToInterrupt = Thread.currentThread();
+
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask()
+ {
+
+ @Override
+ public void run()
+ {
+ threadToInterrupt.interrupt();
+ }
+ }, 3000);
+
+ client.blockUntilConnected(5, TimeUnit.SECONDS);
+ Assert.fail("Expected interruption did not occur");
}
- catch(InterruptedException e)
+ catch ( InterruptedException e )
{
- //This is expected
+ //This is expected
}
finally
{
- CloseableUtils.closeQuietly(client);
+ CloseableUtils.closeQuietly(client);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
index f37f7c0..41ba702 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/AfterConnectionEstablished.java
@@ -22,7 +22,6 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.ThreadUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
/**
@@ -39,24 +38,23 @@ public class AfterConnectionEstablished
* @param client The curator client
* @param runAfterConnection The logic to run
*/
- public static <T> T execute(final CuratorFramework client, final Callable<T> runAfterConnection) throws Exception
+ public static void execute(final CuratorFramework client, final Runnable runAfterConnection) throws Exception
{
//Block until connected
- final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(runAfterConnection.getClass().getSimpleName());
- Callable<T> internalCall = new Callable<T>()
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor(ThreadUtils.getProcessName(runAfterConnection.getClass()));
+ Runnable internalCall = new Runnable()
{
@Override
- public T call() throws Exception
+ public void run()
{
try
{
client.blockUntilConnected();
- return runAfterConnection.call();
+ runAfterConnection.run();
}
catch ( Exception e )
{
log.error("An error occurred blocking until a connection is available", e);
- throw e;
}
finally
{
@@ -64,7 +62,7 @@ public class AfterConnectionEstablished
}
}
};
- return executor.submit(internalCall).get();
+ executor.submit(internalCall);
}
private AfterConnectionEstablished()
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index f4c1cef..dce3f5e 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -26,7 +26,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.listen.ListenerContainer;
-import org.apache.curator.framework.recipes.ExecuteAfterConnectionEstablished;
+import org.apache.curator.framework.recipes.AfterConnectionEstablished;
import org.apache.curator.framework.recipes.locks.LockInternals;
import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
@@ -156,17 +156,22 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- ExecuteAfterConnectionEstablished.executeAfterConnectionEstablishedInBackground
+ AfterConnectionEstablished.execute
(
- client,
- new Callable<Void>()
+ client, new Runnable()
{
@Override
- public Void call() throws Exception
+ public void run()
{
client.getConnectionStateListenable().addListener(listener);
- reset();
- return null;
+ try
+ {
+ reset();
+ }
+ catch ( Exception e )
+ {
+ log.error("An error occurred checking resetting leadership.", e);
+ }
}
}
);
@@ -556,7 +561,7 @@ public class LeaderLatch implements Closeable
private void handleStateChange(ConnectionState newState)
{
- if (newState.isConnected())
+ if ( newState == ConnectionState.RECONNECTED )
{
try
{
http://git-wip-us.apache.org/repos/asf/curator/blob/04cefb47/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 35d8809..b97e708 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,7 +22,6 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
@@ -35,7 +34,6 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -215,6 +213,17 @@ public class TestLeaderLatch extends BaseClassForTests
@Test
public void testWaiting() throws Exception
{
+ final int LOOPS = 10;
+ for ( int i = 0; i < LOOPS; ++i )
+ {
+ System.out.println("TRY #" + i);
+ internalTestWaitingOnce();
+ Thread.sleep(10);
+ }
+ }
+
+ private void internalTestWaitingOnce() throws Exception
+ {
final int PARTICIPANT_QTY = 10;
ExecutorService executorService = Executors.newFixedThreadPool(PARTICIPANT_QTY);
@@ -241,10 +250,10 @@ public class TestLeaderLatch extends BaseClassForTests
Assert.assertTrue(latch.await(timing.forWaiting().seconds(), TimeUnit.SECONDS));
Assert.assertTrue(thereIsALeader.compareAndSet(false, true));
Thread.sleep((int)(10 * Math.random()));
+ thereIsALeader.set(false);
}
finally
{
- thereIsALeader.set(false);
latch.close();
}
return null;
@@ -259,7 +268,7 @@ public class TestLeaderLatch extends BaseClassForTests
}
finally
{
- executorService.shutdown();
+ executorService.shutdownNow();
CloseableUtils.closeQuietly(client);
}
}
@@ -526,23 +535,21 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
}
}
-
+
@Test
public void testNoServerAtStart()
- {
+ {
CloseableUtils.closeQuietly(server);
Timing timing = new Timing();
- CuratorFramework client = CuratorFrameworkFactory.newClient(
- server.getConnectString(), timing.session(),
- timing.connection(), new RetryNTimes(5, 1000));
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryNTimes(5, 1000));
client.start();
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
final AtomicInteger leaderCount = new AtomicInteger(0);
- final AtomicInteger notLeaderCount = new AtomicInteger(0);
+ final AtomicInteger notLeaderCount = new AtomicInteger(0);
leader.addListener(new LeaderLatchListener()
{
@Override
@@ -555,27 +562,26 @@ public class TestLeaderLatch extends BaseClassForTests
@Override
public void notLeader()
{
- notLeaderCount.incrementAndGet();
+ notLeaderCount.incrementAndGet();
}
});
try
{
- leader.start();
-
- //Wait for a while before starting the test server
- Thread.sleep(5000);
+ leader.start();
+
+ timing.sleepABit();
// Start the new server
- server = new TestingServer(server.getPort());
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
-
+
Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
- Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
+ Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
}
- catch (Exception e)
+ catch ( Exception e )
{
Assert.fail("Unexpected exception", e);
}
@@ -585,7 +591,7 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
- }
+ }
private enum Mode
{