You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2014/08/11 00:13:35 UTC
git commit: CURATOR-79 - Modified the 'withProtection' handling,
so that any failure that is not a ConnectionLossException or other
KeeperException is treated the same as a ConnectionLossException. This means
that if the thread gets interrupted, or encou
Repository: curator
Updated Branches:
refs/heads/CURATOR-79 [created] 5398b72f0
CURATOR-79 - Modified the 'withProtection' handling, so that any failure
that is not a ConnectionLossException or other KeeperException is
treated the same as a ConnectionLossException. This means that if the
thread gets interrupted, or encounters some sort of other error, during
creation of a protected zNode, it will remove the affected zNode prior
to rethrowing the exception.
Exposed the debug UnhandledExceptionListener on the CuratorFrameworkImpl
to reproduce this issue via a unit test.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5398b72f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5398b72f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5398b72f
Branch: refs/heads/CURATOR-79
Commit: 5398b72f00ccc0f2ea995865fcaf92d73c6c5818
Parents: 6bd0cc3
Author: Cam McKenzie <ca...@apache.org>
Authored: Mon Aug 11 08:13:12 2014 +1000
Committer: Cam McKenzie <ca...@apache.org>
Committed: Mon Aug 11 08:13:12 2014 +1000
----------------------------------------------------------------------
.../framework/imps/CreateBuilderImpl.java | 20 ++++++
.../framework/imps/CuratorFrameworkImpl.java | 6 +-
.../recipes/locks/TestInterProcessMutex.java | 72 ++++++++++++++++++++
3 files changed, 97 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/5398b72f/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
index 3cf23b8..3028cd5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java
@@ -469,6 +469,26 @@ class CreateBuilderImpl implements CreateBuilder, BackgroundOperation<PathAndByt
}
throw e;
}
+ catch ( KeeperException e )
+ {
+ throw e;
+ }
+ catch ( Exception e )
+ {
+ if ( protectedId != null )
+ {
+ /*
+ * CURATOR-79 - Handle an runtime exception's here and treat the
+ * same as a connection loss exception. This is necessary as, from
+ * the clients point of view, an exception has been thrown and the
+ * zNode should not exist on ZK. This was causing deadlock in the
+ * locking recipes.
+ */
+ findAndDeleteProtectedNodeInBackground(adjustedPath, protectedId, null);
+ protectedId = UUID.randomUUID().toString();
+ }
+ throw e;
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/curator/blob/5398b72f/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index e86b1e5..fbe1ec7 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -19,8 +19,10 @@
package org.apache.curator.framework.imps;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
@@ -44,6 +46,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -85,7 +88,8 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
volatile DebugBackgroundListener debugListener = null;
- volatile UnhandledErrorListener debugUnhandledErrorListener = null;
+ @VisibleForTesting
+ public volatile UnhandledErrorListener debugUnhandledErrorListener = null;
private final AtomicReference<CuratorFrameworkState> state;
http://git-wip-us.apache.org/repos/asf/curator/blob/5398b72f/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
index bbc3466..85e5c39 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessMutex.java
@@ -20,9 +20,14 @@ package org.apache.curator.framework.recipes.locks;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.imps.CuratorFrameworkImpl;
import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.Timing;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.testng.Assert;
import org.testng.annotations.Test;
+
import java.util.Collection;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -30,6 +35,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
public class TestInterProcessMutex extends TestInterProcessMutexBase
{
@@ -107,4 +113,70 @@ public class TestInterProcessMutex extends TestInterProcessMutexBase
client.close();
}
}
+
+ /**
+ * See CURATOR-79. If the mutex is interrupted while attempting to acquire a lock it is
+ * possible for the zNode to be created in ZooKeeper, but for Curator to think that it
+ * hasn't been. This causes the next call to acquire() to fail because the an orphaned
+ * zNode has been left behind from the previous call.
+ */
+ @Test
+ public void testInterruptedDuringAcquire() throws Exception
+ {
+ Timing timing = new Timing();
+ final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
+ client.start();
+ final InterProcessMutex lock = new InterProcessMutex(client, LOCK_PATH);
+
+ final AtomicBoolean interruptOnError = new AtomicBoolean(true);
+
+ ((CuratorFrameworkImpl)client).debugUnhandledErrorListener = new UnhandledErrorListener()
+ {
+
+ @Override
+ public void unhandledError(String message, Throwable e)
+ {
+ if(interruptOnError.compareAndSet(true, false))
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ //The lock path needs to exist for the deadlock to occur.
+ try {
+ client.create().creatingParentsIfNeeded().forPath(LOCK_PATH);
+ } catch(NodeExistsException e) {
+ }
+
+ try
+ {
+ //Interrupt the current thread. This will cause ensurePath() to fail.
+ //We need to reinterrupt in the debugUnhandledErrorListener above.
+ Thread.currentThread().interrupt();
+ lock.acquire();
+ Assert.fail();
+ }
+ catch(InterruptedException e)
+ {
+ //Expected lock to have failed.
+ Assert.assertTrue(!lock.isOwnedByCurrentThread());
+ }
+
+ try
+ {
+ Assert.assertTrue(lock.acquire(timing.seconds(), TimeUnit.SECONDS));
+ }
+ catch(Exception e)
+ {
+ Assert.fail();
+ }
+ finally
+ {
+ if(lock.isOwnedByCurrentThread())
+ {
+ lock.release();
+ }
+ }
+ }
}