You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2018/12/14 01:37:37 UTC
curator git commit: CURATOR-495
Repository: curator
Updated Branches:
refs/heads/CURATOR-495 [created] f91adb22e
CURATOR-495
Fixes race in many Curator recipes whereby a pattern was used that called "notifyAll" in a synchronized block in response to a ZooKeeper watcher callback. This created a race and possible deadlock if the recipe instance was already in a synchronized block. This would result in the ZK event thread getting blocked which would prevent ZK connections from getting repaired. This change adds a new executor (available from CuratorFramework) that can be used to do the sync/notify so that ZK's event thread is not blocked.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/f91adb22
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/f91adb22
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/f91adb22
Branch: refs/heads/CURATOR-495
Commit: f91adb22e83d8e47b99ad98f8c13c86251bc4cb3
Parents: 22c028f
Author: randgalt <ra...@apache.org>
Authored: Thu Dec 13 20:34:40 2018 -0500
Committer: randgalt <ra...@apache.org>
Committed: Thu Dec 13 20:34:40 2018 -0500
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 35 ++++++++++++++++----
.../framework/CuratorFrameworkFactory.java | 27 ++++++++++++++-
.../framework/imps/CuratorFrameworkImpl.java | 25 ++++++++++++++
.../recipes/barriers/DistributedBarrier.java | 7 +---
.../barriers/DistributedDoubleBarrier.java | 13 ++++----
.../recipes/locks/InterProcessSemaphoreV2.java | 14 +++-----
.../framework/recipes/locks/LockInternals.java | 11 ++----
7 files changed, 94 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index bf6167c..02c458a 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -25,16 +25,14 @@ import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
-import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.schema.SchemaSet;
-import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
+import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
-
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
@@ -269,7 +267,7 @@ public interface CuratorFramework extends Closeable
* Call this method on watchers you are no longer interested in.
*
* @param watcher the watcher
- *
+ *
* @deprecated As of ZooKeeper 3.5 Curators recipes will handle removing watcher references
* when they are no longer used. If you write your own recipe, follow the example of Curator
* recipes and use {@link #newWatcherRemoveCuratorFramework} calling {@link WatcherRemoveCuratorFramework#removeWatchers()}
@@ -277,7 +275,7 @@ public interface CuratorFramework extends Closeable
*/
@Deprecated
public void clearWatcherReferences(Watcher watcher);
-
+
/**
* Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
* @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
@@ -286,7 +284,7 @@ public interface CuratorFramework extends Closeable
* @throws InterruptedException If interrupted while waiting
*/
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
-
+
/**
* Block until a connection to ZooKeeper is available. This method will not return until a
* connection is available or it is interrupted, in which case an InterruptedException will
@@ -331,4 +329,29 @@ public interface CuratorFramework extends Closeable
* @return true/false
*/
boolean isZk34CompatibilityMode();
+
+ /**
+ * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
+ * done from the {@link #runSafe(Runnable)} thread.
+ *
+ * @param monitorHolder object to sync on and notify
+ * @since 4.1.0
+ */
+ default void postSafeNotify(Object monitorHolder)
+ {
+ runSafe(() -> {
+ synchronized(monitorHolder) {
+ monitorHolder.notifyAll();
+ }
+ });
+ }
+
+ /**
+ * Curator (and user) recipes can use this to run notifyAll
+ * and other blocking calls that might normally block ZooKeeper's event thread.
+
+ * @param runnable proc to call from a safe internal thread
+ * @since 4.1.0
+ */
+ void runSafe(Runnable runnable);
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
index f3daeab..86fbfce 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java
@@ -47,6 +47,7 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
@@ -149,6 +150,8 @@ public class CuratorFrameworkFactory
private SchemaSet schemaSet = SchemaSet.getDefaultSchemaSet();
private boolean zk34CompatibilityMode = isZK34();
private int waitForShutdownTimeoutMs = 0;
+ private Executor runSafeService = null;
+
/**
* Apply the current values and build a new CuratorFramework
*
@@ -189,7 +192,7 @@ public class CuratorFrameworkFactory
/**
* Add connection authorization
- *
+ *
* Subsequent calls to this method overwrite the prior calls.
*
* @param scheme the scheme
@@ -474,6 +477,28 @@ public class CuratorFrameworkFactory
return this;
}
+ /**
+ * Curator (and user) recipes will use this executor to call notifyAll
+ * and other blocking calls that might normally block ZooKeeper's event thread.
+ * By default, an executor is allocated internally using the provided (or default)
+ * {@link #threadFactory(java.util.concurrent.ThreadFactory)}. Use this method
+ * to set a custom executor.
+ *
+ * @param runSafeService executor to use for calls to notifyAll from Watcher callbacks etc
+ * @return this
+ * @since 4.1.0
+ */
+ public Builder runSafeService(Executor runSafeService)
+ {
+ this.runSafeService = runSafeService;
+ return null;
+ }
+
+ public Executor getRunSafeService()
+ {
+ return runSafeService;
+ }
+
public ACLProvider getAclProvider()
{
return aclProvider;
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index 1ae6a5e..34002a0 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -61,6 +61,7 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
@@ -96,6 +97,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final boolean zk34CompatibilityMode;
+ private final Executor runSafeService;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
@@ -163,6 +165,22 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespaceFacadeCache = new NamespaceFacadeCache(this);
ensembleTracker = zk34CompatibilityMode ? null : new EnsembleTracker(this, builder.getEnsembleProvider());
+
+ runSafeService = makeRunSafeService(builder);
+ }
+
+ private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder)
+ {
+ if ( builder.getRunSafeService() != null )
+ {
+ return builder.getRunSafeService();
+ }
+ ThreadFactory threadFactory = builder.getThreadFactory();
+ if ( threadFactory == null )
+ {
+ threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService");
+ }
+ return Executors.newSingleThreadExecutor(threadFactory);
}
private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
@@ -176,6 +194,12 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
@Override
+ public void runSafe(Runnable runnable)
+ {
+ runSafeService.execute(runnable);
+ }
+
+ @Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
{
return new WatcherRemovalFacade(this);
@@ -240,6 +264,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
schemaSet = parent.schemaSet;
zk34CompatibilityMode = parent.zk34CompatibilityMode;
ensembleTracker = null;
+ runSafeService = parent.runSafeService;
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
index 8a376f1..fb00ed9 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java
@@ -44,7 +44,7 @@ public class DistributedBarrier
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(DistributedBarrier.this);
}
};
@@ -142,9 +142,4 @@ public class DistributedBarrier
}
return result;
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
index b3bdf2c..2315178 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/barriers/DistributedDoubleBarrier.java
@@ -65,7 +65,12 @@ public class DistributedDoubleBarrier
public void process(WatchedEvent event)
{
connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
- notifyFromWatcher();
+ client.runSafe(() -> {
+ synchronized(DistributedDoubleBarrier.this) {
+ hasBeenNotified.set(true);
+ DistributedDoubleBarrier.this.notifyAll();
+ }
+ });
}
};
@@ -337,10 +342,4 @@ public class DistributedDoubleBarrier
return result;
}
-
- private synchronized void notifyFromWatcher()
- {
- hasBeenNotified.set(true);
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 03e1088..6404888 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -40,7 +40,6 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
@@ -88,7 +87,7 @@ public class InterProcessSemaphoreV2
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(InterProcessSemaphoreV2.this);
}
};
@@ -141,7 +140,7 @@ public class InterProcessSemaphoreV2
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception
{
InterProcessSemaphoreV2.this.maxLeases = newCount;
- notifyFromWatcher();
+ client.postSafeNotify(InterProcessSemaphoreV2.this);
}
@Override
@@ -373,7 +372,7 @@ public class InterProcessSemaphoreV2
synchronized(this)
{
for(;;)
- {
+ {
List<String> children;
try
{
@@ -392,7 +391,7 @@ public class InterProcessSemaphoreV2
log.error("Sequential path not found: " + path);
return InternalAcquireResult.RETRY_DUE_TO_MISSING_NODE;
}
-
+
if ( children.size() <= maxLeases )
{
break;
@@ -479,9 +478,4 @@ public class InterProcessSemaphoreV2
}
};
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/f91adb22/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
index 46820af..a22bfb1 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockInternals.java
@@ -66,7 +66,7 @@ public class LockInternals
@Override
public void process(WatchedEvent event)
{
- notifyFromWatcher();
+ client.postSafeNotify(LockInternals.this);
}
};
@@ -295,7 +295,7 @@ public class LockInternals
synchronized(this)
{
- try
+ try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
@@ -316,7 +316,7 @@ public class LockInternals
wait();
}
}
- catch ( KeeperException.NoNodeException e )
+ catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
@@ -351,9 +351,4 @@ public class LockInternals
// ignore - already deleted (possibly expired session, etc.)
}
}
-
- private synchronized void notifyFromWatcher()
- {
- notifyAll();
- }
}