You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2018/12/17 03:26:07 UTC
[1/3] curator git commit: CURATOR-495
Repository: curator
Updated Branches:
refs/heads/master 22c028f56 -> 2e802efed
CURATOR-495
Fixes race in many Curator recipes whereby a pattern was used that called "notifyAll" in a synchronized block in response to a ZooKeeper watcher callback. This created a race and possible deadlock if the recipe instance was already in a synchronized block. This would result in the ZK event thread getting blocked which would prevent ZK connections from getting repaired. This change adds a new executor (available from CuratorFramework) that can be used to do the sync/notify so that ZK's event thread is not blocked.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f91adb22
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f91adb22
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f91adb22
Branch: refs/heads/master
Commit: f91adb22e83d8e47b99ad98f8c13c86251bc4cb3
Parents: 22c028f
Author: randgalt <ra...@apache.org>
Authored: Thu Dec 13 20:34:40 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Dec 13 20:34:40 2018 -0500
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 35 ++++++++++++++++----
.../framework/CuratorFrameworkFactory.java | 27 ++++++++++++++-
.../framework/imps/CuratorFrameworkImpl.java | 25 ++++++++++++++
.../recipes/barriers/DistributedBarrier.java | 7 +---
.../barriers/DistributedDoubleBarrier.java | 13 ++++----
.../recipes/locks/InterProcessSemaphoreV2.java | 14 +++-----
.../framework/recipes/locks/LockInternals.java | 11 ++----
7 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index bf6167c..02c458a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -25,16 +25,14 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.schema.SchemaSet;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
@@ -269,7 +267,7 @@ public interface CuratorFramework extends Closeable
* Call this method on watchers you are no longer interested in.
*
* @param watcher the watcher
- *
+ *
* @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
* when they are no longer used. If you write your own recipe, follow the example of Curator
* recipes and use {@link #newWatcherRemoveCuratorFramework} calling {@link WatcherRemoveCuratorFramework#removeWatchers()}
@@ -277,7 +275,7 @@ public interface CuratorFramework extends Closeable
*/
@Deprecated
public void clearWatcherReferences(Watcher watcher);
-
+
/**
* Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
* @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
@@ -286,7 +284,7 @@ public interface CuratorFramework extends Closeable
* @throws InterruptedException If interrupted while waiting
*/
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
-
+
/**
* Block until a connection to ZooKeeper is available. This method will not return until a
* connection is available or it is interrupted, in which case an InterruptedException will
@@ -331,4 +329,29 @@ public interface CuratorFramework extends Closeable
* @return true/false
*/
boolean isZk34CompatibilityMode();
+
+ /**
+ * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
+ * done from the {@link #runSafe(Runnable)} thread.
+ *
+ * @param monitorHolder object to sync on and notify
+ * @since 4.1.0
+ */
+ default void postSafeNotify(Object monitorHolder)
+ {
+ runSafe(() -> {
+ synchronized(monitorHolder) {
+ monitorHolder.notifyAll();
+ }
+ });
+ }
+
+ /**
+ * Curator (and user) recipes can use this to run notifyAll
+ * and other blocking calls that might normally block ZooKeeper's event thread.
+
+ * @param runnable proc to call from a safe internal thread
+ * @since 4.1.0
+ */
+ void runSafe(Runnable runnable);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index f3daeab..86fbfce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -47,6 +47,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
@@ -149,6 +150,8 @@ public class CuratorFrameworkFactory
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
+ private Executor runSafeService = null;
+
/**
* Apply the current values and build a new CuratorFramework
*
@@ -189,7 +192,7 @@ public class CuratorFrameworkFactory
/**
* Add connection authorization
- *
+ *
* Subsequent calls to this method overwrite the prior calls.
*
* @param scheme the scheme
@@ -474,6 +477,28 @@ public class CuratorFrameworkFactory
return this;
}
+ /**
+ * Curator (and user) recipes will use this executor to call notifyAll
+ * and other blocking calls that might normally block ZooKeeper's event thread.
+ * By default, an executor is allocated internally using the provided (or default)
+ * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method
+ * to set a custom executor.
+ *
+ * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc
+ * @return this
+ * @since 4.1.0
+ */
+ public Builder runSafeService(Executor runSafeService)
+ {
+ this.runSafeService = runSafeService;
+ return null;
+ }
+
+ public Executor getRunSafeService()
+ {
+ return runSafeService;
+ }
+
public ACLProvider getAclProvider()
{
return aclProvider;
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1ae6a5e..34002a0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -96,6 +97,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final boolean zk34CompatibilityMode;
+ private final Executor runSafeService;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -163,6 +165,22 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespaceFacadeCache = new NamespaceFacadeCache(this);
ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+
+ runSafeService = makeRunSafeService(builder);
+ }
+
+ private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder)
+ {
+ if ( builder.getRunSafeService() != null )
+ {
+ return builder.getRunSafeService();
+ }
+ ThreadFactory threadFactory = builder.getThreadFactory();
+ if ( threadFactory == null )
+ {
+ threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService");
+ }
+ return Executors.newSingleThreadExecutor(threadFactory);
}
private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
@@ -176,6 +194,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
+ public void runSafe(Runnable runnable)
+ {
+ runSafeService.execute(runnable);
+ }
+
+ @Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
{
return new WatcherRemovalFacade(this);
@@ -240,6 +264,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
schemaSet = parent.schemaSet;
zk34CompatibilityMode = parent.zk34CompatibilityMode;
ensembleTracker = null;
+ runSafeService = parent.runSafeService;
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
index 8a376f1..fb00ed9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
@@ -44,7 +44,7 @@ public class DistributedBarrier
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(DistributedBarrier.this);
}
};
@@ -142,9 +142,4 @@ public class DistributedBarrier
}
return result;
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
index b3bdf2c..2315178 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
@@ -65,7 +65,12 @@ public class DistributedDoubleBarrier
public void process(WatchedEvent event)
{
connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
- notifyFromWatcher();
+ client.runSafe(() -> {
+ synchronized(DistributedDoubleBarrier.this) {
+ hasBeenNotified.set(true);
+ DistributedDoubleBarrier.this.notifyAll();
+ }
+ });
}
};
@@ -337,10 +342,4 @@ public class DistributedDoubleBarrier
return result;
}
-
- private synchronized void notifyFromWatcher()
- {
- hasBeenNotified.set(true);
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 03e1088..6404888 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -40,7 +40,6 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -88,7 +87,7 @@ public class InterProcessSemaphoreV2
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(InterProcessSemaphoreV2.this);
}
};
@@ -141,7 +140,7 @@ public class InterProcessSemaphoreV2
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
{
InterProcessSemaphoreV2.this.maxLeases = newCount;
- notifyFromWatcher();
+ client.postSafeNotify(InterProcessSemaphoreV2.this);
}
@Override
@@ -373,7 +372,7 @@ public class InterProcessSemaphoreV2
synchronized(this)
{
for(;;)
- {
+ {
List<String> children;
try
{
@@ -392,7 +391,7 @@ public class InterProcessSemaphoreV2
log.error("Sequential path not found: " + path);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
-
+
if ( children.size() <= maxLeases )
{
break;
@@ -479,9 +478,4 @@ public class InterProcessSemaphoreV2
}
};
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 46820af..a22bfb1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -66,7 +66,7 @@ public class LockInternals
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(LockInternals.this);
}
};
@@ -295,7 +295,7 @@ public class LockInternals
synchronized(this)
{
- try
+ try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
@@ -316,7 +316,7 @@ public class LockInternals
wait();
}
}
- catch ( KeeperException.NoNodeException e )
+ catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
@@ -351,9 +351,4 @@ public class LockInternals
// ignore - already deleted (possibly expired session, etc.)
}
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}
[3/3] curator git commit: CURATOR-495 runSafeService should return
this
Posted by ra...@apache.org.
CURATOR-495 runSafeService should return this
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2e802efe
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2e802efe
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2e802efe
Branch: refs/heads/master
Commit: 2e802efeddd28b3cd6c0d96fb0708c0f0a94a35c
Parents: 4fa5c1d
Author: randgalt <ra...@apache.org>
Authored: Sun Dec 16 19:59:06 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sun Dec 16 19:59:06 2018 -0500
----------------------------------------------------------------------
.../java/org/apache/curator/framework/CuratorFrameworkFactory.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2e802efe/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index 86fbfce..395df71 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -491,7 +491,7 @@ public class CuratorFrameworkFactory
public Builder runSafeService(Executor runSafeService)
{
this.runSafeService = runSafeService;
- return null;
+ return this;
}
public Executor getRunSafeService()
[2/3] curator git commit: CURATOR-495 - have methods return a
CompletableFuture so that callers can check completion, etc.
Posted by ra...@apache.org.
CURATOR-495 - have methods return a CompletableFuture so that callers can check completion, etc.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/4fa5c1d4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/4fa5c1d4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/4fa5c1d4
Branch: refs/heads/master
Commit: 4fa5c1d4d2e0d344db21c52c61e305836ea1bde5
Parents: f91adb2
Author: randgalt <ra...@apache.org>
Authored: Fri Dec 14 14:20:12 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Fri Dec 14 14:20:12 2018 -0500
----------------------------------------------------------------------
.../apache/curator/framework/CuratorFramework.java | 9 ++++++---
.../curator/framework/imps/CuratorFrameworkImpl.java | 14 +++-----------
2 files changed, 9 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/4fa5c1d4/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 02c458a..3737faa 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -34,6 +34,7 @@ import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
@@ -335,11 +336,12 @@ public interface CuratorFramework extends Closeable
* done from the {@link #runSafe(Runnable)} thread.
*
* @param monitorHolder object to sync on and notify
+ * @return a CompletableFuture that can be used to monitor when the call is complete
* @since 4.1.0
*/
- default void postSafeNotify(Object monitorHolder)
+ default CompletableFuture<Void> postSafeNotify(Object monitorHolder)
{
- runSafe(() -> {
+ return runSafe(() -> {
synchronized(monitorHolder) {
monitorHolder.notifyAll();
}
@@ -351,7 +353,8 @@ public interface CuratorFramework extends Closeable
* and other blocking calls that might normally block ZooKeeper's event thread.
* @param runnable proc to call from a safe internal thread
+ * @return a CompletableFuture that can be used to monitor when the call is complete
* @since 4.1.0
*/
- void runSafe(Runnable runnable);
+ CompletableFuture<Void> runSafe(Runnable runnable);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/4fa5c1d4/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 34002a0..736b737 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -58,15 +58,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -194,9 +186,9 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
- public void runSafe(Runnable runnable)
+ public CompletableFuture<Void> runSafe(Runnable runnable)
{
- runSafeService.execute(runnable);
+ return CompletableFuture.runAsync(runnable, runSafeService);
}
@Override