You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:12 UTC
[08/18] git commit: CURATOR-110 - Modified the CuratorFramework to
expose a 'blockUntilConnected' method. This allows a race condition between
the start() method and the ConnectionStateListener in the LeaderLatch recipe
to be avoided. Updated the leader
CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader latch start() method to block until a connection is available (in a background thread), before it attempts to setup its state. This means that it will work correctly if started before a connection to ZK is available.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63d0401c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63d0401c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63d0401c
Branch: refs/heads/master
Commit: 63d0401c05f6cc122468f55c79c9e1bd4ddb3eb5
Parents: 2c376b9
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Thu Jun 12 14:09:45 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Thu Jun 12 14:09:45 2014 +1000
----------------------------------------------------------------------
.../curator/framework/CuratorFramework.java | 27 +++
.../framework/imps/CuratorFrameworkImpl.java | 69 ++++++
.../framework/imps/TestBlockUntilConnected.java | 217 +++++++++++++++++++
.../framework/recipes/leader/LeaderLatch.java | 38 +++-
.../recipes/leader/TestLeaderLatch.java | 43 ++--
5 files changed, 366 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 2d6e182..1df3fa5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -24,10 +24,13 @@ import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
+
import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
/**
* Zookeeper framework-style client
@@ -210,4 +213,28 @@ public interface CuratorFramework extends Closeable
* @param watcher the watcher
*/
public void clearWatcherReferences(Watcher watcher);
+
+ /**
+ * Get the current connection state. The connection state will have a value of 0 until
+ * the first connection related event is received.
+ * @return The current connection state, or null if it is unknown
+ */
+ public ConnectionState getCurrentConnectionState();
+
+ /**
+ * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+ * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
+ * @param units The time units for the maximum wait time.
+ * @return True if connection has been established, false otherwise.
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+
+ /**
+ * Block until a connection to ZooKeeper is available. This method will not return until a
+ * connection is available or it is interrupted, in which case an InterruptedException will
+ * be thrown
+ * @throws InterruptedException If interrupted while waiting
+ */
+ public void blockUntilConnected() throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c4b1349..33e260d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
@@ -43,6 +44,7 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.util.Arrays;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
@@ -65,6 +67,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
+ private final AtomicReference<ConnectionState> connectionState;
private final AtomicReference<AuthInfo> authInfo = new AtomicReference<AuthInfo>();
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
@@ -148,6 +151,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+ connectionState = new AtomicReference<ConnectionState>(null);
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -162,6 +166,21 @@ public class CuratorFrameworkImpl implements CuratorFramework
failedDeleteManager = new FailedDeleteManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
+
+ //Add callback handler to determine connection state transitions
+ getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ connectionState.set(newState);
+ synchronized(connectionState)
+ {
+ connectionState.notifyAll();
+ }
+ }
+ });
}
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -201,6 +220,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
threadFactory = parent.threadFactory;
backgroundOperations = parent.backgroundOperations;
connectionStateManager = parent.connectionStateManager;
+ connectionState = parent.connectionState;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
compressionProvider = parent.compressionProvider;
@@ -869,4 +889,53 @@ public class CuratorFrameworkImpl implements CuratorFramework
}
);
}
+
+ public ConnectionState getCurrentConnectionState()
+ {
+ return connectionState.get();
+ }
+
+ @Override
+ public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+ {
+ //Check if we're already connected
+ ConnectionState currentConnectionState = connectionState.get();
+ if(currentConnectionState != null && currentConnectionState.isConnected())
+ {
+ return true;
+ }
+
+ long startTime = System.currentTimeMillis();
+ long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
+
+ for(;;)
+ {
+ synchronized(connectionState)
+ {
+ currentConnectionState = connectionState.get();
+ if(currentConnectionState != null && currentConnectionState.isConnected())
+ {
+ return true;
+ }
+
+ long waitTime = 0;
+ if(maxWaitTime > 0)
+ {
+ waitTime = maxWaitTimeMS - (System.currentTimeMillis() - startTime);
+
+ //Timeout
+ if(waitTime <= 0)
+ {
+ return false;
+ }
+ }
+
+ connectionState.wait(waitTime);
+ }
+ }
+ }
+
+ public void blockUntilConnected() throws InterruptedException {
+ blockUntilConnected(0, TimeUnit.SECONDS);
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
new file mode 100644
index 0000000..996e5fc
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.imps;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestBlockUntilConnected extends BaseClassForTests
+{
+ /**
+ * Test the case where we're already connected
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyConnected()
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ final CountDownLatch connectedLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState.isConnected())
+ {
+ connectedLatch.countDown();
+ }
+ }
+ });
+
+ client.start();
+
+ Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+ Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and never have been
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyNeverConnected()
+ {
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected, but have been previously
+ */
+ @Test
+ public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+ {
+ Timing timing = new Timing();
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ final CountDownLatch lostLatch = new CountDownLatch(1);
+ client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+ {
+
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState)
+ {
+ if(newState == ConnectionState.LOST)
+ {
+ lostLatch.countDown();
+ }
+ }
+ });
+
+ try
+ {
+ client.start();
+
+ //Block until we're connected
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Failed to connect");
+
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ //Wait until we hit the lost state
+ Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+
+ server = new TestingServer(server.getPort(), server.getTempDirectory());
+
+ Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+ }
+ catch(Exception e)
+ {
+ Assert.fail("Unexpected exception " + e);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and time out before a
+ * connection becomes available.
+ */
+ @Test
+ public void testBlockUntilConnectedConnectTimeout()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+ Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
+ "Connected");
+ }
+ catch(InterruptedException e)
+ {
+ Assert.fail("Unexpected interruption");
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ /**
+ * Test the case where we are not currently connected and the thread gets interrupted
+ * prior to a connection becoming available
+ */
+ @Test
+ public void testBlockUntilConnectedInterrupt()
+ {
+ //Kill the server
+ CloseableUtils.closeQuietly(server);
+
+ final CuratorFramework client = CuratorFrameworkFactory.builder().
+ connectString(server.getConnectString()).
+ retryPolicy(new RetryOneTime(1)).
+ build();
+
+ try
+ {
+ client.start();
+
+ final Thread threadToInterrupt = Thread.currentThread();
+
+ Timer timer = new Timer();
+ timer.schedule(new TimerTask() {
+
+ @Override
+ public void run() {
+ threadToInterrupt.interrupt();
+ }
+ }, 3000);
+
+ client.blockUntilConnected(5, TimeUnit.SECONDS);
+ Assert.fail("Expected interruption did not occur");
+ }
+ catch(InterruptedException e)
+ {
+ //This is expected
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index c1157c9..21e8cca 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
@@ -31,6 +32,7 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -38,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
@@ -45,6 +48,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -144,6 +148,14 @@ public class LeaderLatch implements Closeable
this.id = Preconditions.checkNotNull(id, "id cannot be null");
this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
}
+
+ private CountDownLatch startLatch;
+
+ public LeaderLatch(CuratorFramework client, String latchPath,
+ CountDownLatch startLatch) {
+ this(client, latchPath);
+ this.startLatch = startLatch;
+ }
/**
* Add this instance to the leadership election and attempt to acquire leadership.
@@ -154,8 +166,30 @@ public class LeaderLatch implements Closeable
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
- client.getConnectionStateListenable().addListener(listener);
- reset();
+ //Block until connected
+ final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
+ executor.submit(new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ try
+ {
+ client.blockUntilConnected();
+
+ client.getConnectionStateListenable().addListener(listener);
+ reset();
+ }
+ catch(Exception ex)
+ {
+ log.error("An error occurred checking resetting leadership.", ex);
+ } finally {
+ //Shutdown the executor
+ executor.shutdown();
+ }
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d3b00bc..35d8809 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,10 +22,12 @@ package org.apache.curator.framework.recipes.leader;
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.test.TestingServer;
@@ -33,6 +35,7 @@ import org.apache.curator.test.Timing;
import org.apache.curator.utils.CloseableUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -42,6 +45,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class TestLeaderLatch extends BaseClassForTests
@@ -531,57 +535,45 @@ public class TestLeaderLatch extends BaseClassForTests
Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(
server.getConnectString(), timing.session(),
- timing.connection(), new RetryOneTime(1));
+ timing.connection(), new RetryNTimes(5, 1000));
client.start();
- final CountDownLatch sessionLostCount = new CountDownLatch(1);
-
- // Need to ensure that we've actually lost the connection completely
- // before starting. Otherwise it's possible that the server will start
- // during retries of the inital commands to create the latch zNodes
- client.getConnectionStateListenable().addListener(
- new ConnectionStateListener()
- {
- @Override
- public void stateChanged(CuratorFramework client,
- ConnectionState newState)
- {
- if (newState == ConnectionState.LOST)
- {
- sessionLostCount.countDown();
- }
- }
- });
-
final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
final CountDownLatch leaderCounter = new CountDownLatch(1);
+ final AtomicInteger leaderCount = new AtomicInteger(0);
+ final AtomicInteger notLeaderCount = new AtomicInteger(0);
leader.addListener(new LeaderLatchListener()
{
@Override
public void isLeader()
{
leaderCounter.countDown();
+ leaderCount.incrementAndGet();
}
@Override
public void notLeader()
{
+ notLeaderCount.incrementAndGet();
}
});
try
{
- leader.start();
-
- timing.awaitLatch(sessionLostCount);
+ leader.start();
+
+ //Wait for a while before starting the test server
+ Thread.sleep(5000);
// Start the new server
server = new TestingServer(server.getPort());
- timing.awaitLatch(leaderCounter);
-
+ Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
+
+ Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
+ Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");
}
catch (Exception e)
{
@@ -593,7 +585,6 @@ public class TestLeaderLatch extends BaseClassForTests
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
-
}
private enum Mode