You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/06/18 01:03:12 UTC

[08/18] git commit: CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader

CURATOR-110 - Modified the CuratorFramework to expose a 'blockUntilConnected' method. This allows a race condition between the start() method and the ConnectionStateListener in the LeaderLatch recipe to be avoided. Updated the leader latch start() method to block until a connection is available (in a background thread), before it attempts to setup its state. This means that it will work correctly if started before a connection to ZK is available.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/63d0401c
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/63d0401c
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/63d0401c

Branch: refs/heads/master
Commit: 63d0401c05f6cc122468f55c79c9e1bd4ddb3eb5
Parents: 2c376b9
Author: Cameron McKenzie <mc...@gmail.com>
Authored: Thu Jun 12 14:09:45 2014 +1000
Committer: Cameron McKenzie <mc...@gmail.com>
Committed: Thu Jun 12 14:09:45 2014 +1000

----------------------------------------------------------------------
 .../curator/framework/CuratorFramework.java     |  27 +++
 .../framework/imps/CuratorFrameworkImpl.java    |  69 ++++++
 .../framework/imps/TestBlockUntilConnected.java | 217 +++++++++++++++++++
 .../framework/recipes/leader/LeaderLatch.java   |  38 +++-
 .../recipes/leader/TestLeaderLatch.java         |  43 ++--
 5 files changed, 366 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
index 2d6e182..1df3fa5 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java
@@ -24,10 +24,13 @@ import org.apache.curator.framework.api.*;
 import org.apache.curator.framework.api.transaction.CuratorTransaction;
 import org.apache.curator.framework.imps.CuratorFrameworkState;
 import org.apache.curator.framework.listen.Listenable;
+import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
 import org.apache.curator.utils.EnsurePath;
 import org.apache.zookeeper.Watcher;
+
 import java.io.Closeable;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Zookeeper framework-style client
@@ -210,4 +213,28 @@ public interface CuratorFramework extends Closeable
      * @param watcher the watcher
      */
     public void clearWatcherReferences(Watcher watcher);
+    
+    /**
+     * Get the current connection state. The connection state will have a value of 0 until
+     * the first connection related event is received.
+     * @return The current connection state, or null if it is unknown 
+     */
+    public ConnectionState getCurrentConnectionState();
+    
+    /**
+     * Block until a connection to ZooKeeper is available or the maxWaitTime has been exceeded
+     * @param maxWaitTime The maximum wait time. Specify a value <= 0 to wait indefinitely
+     * @param units The time units for the maximum wait time.
+     * @return True if connection has been established, false otherwise.
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException;
+    
+    /**
+     * Block until a connection to ZooKeeper is available. This method will not return until a
+     * connection is available or it is interrupted, in which case an InterruptedException will
+     * be thrown
+     * @throws InterruptedException If interrupted while waiting
+     */
+    public void blockUntilConnected() throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
index c4b1349..33e260d 100644
--- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
+++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java
@@ -20,6 +20,7 @@ package org.apache.curator.framework.imps;
 
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.CuratorConnectionLossException;
 import org.apache.curator.CuratorZookeeperClient;
 import org.apache.curator.RetryLoop;
@@ -43,6 +44,7 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooKeeper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.util.Arrays;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
@@ -65,6 +67,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
     private final BlockingQueue<OperationAndData<?>>                    backgroundOperations;
     private final NamespaceImpl                                         namespace;
     private final ConnectionStateManager                                connectionStateManager;
+    private final AtomicReference<ConnectionState>						connectionState;
     private final AtomicReference<AuthInfo>                             authInfo = new AtomicReference<AuthInfo>();
     private final byte[]                                                defaultData;
     private final FailedDeleteManager                                   failedDeleteManager;
@@ -148,6 +151,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         namespace = new NamespaceImpl(this, builder.getNamespace());
         threadFactory = getThreadFactory(builder);
         connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
+        connectionState = new AtomicReference<ConnectionState>(null);
         compressionProvider = builder.getCompressionProvider();
         aclProvider = builder.getAclProvider();
         state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
@@ -162,6 +166,21 @@ public class CuratorFrameworkImpl implements CuratorFramework
 
         failedDeleteManager = new FailedDeleteManager(this);
         namespaceFacadeCache = new NamespaceFacadeCache(this);
+        
+        //Add callback handler to determine connection state transitions
+    	getConnectionStateListenable().addListener(new ConnectionStateListener()
+    	{
+			
+			@Override
+			public void stateChanged(CuratorFramework client, ConnectionState newState)
+			{
+				connectionState.set(newState);
+				synchronized(connectionState)
+				{
+					connectionState.notifyAll();
+				}
+			}
+		});
     }
 
     private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
@@ -201,6 +220,7 @@ public class CuratorFrameworkImpl implements CuratorFramework
         threadFactory = parent.threadFactory;
         backgroundOperations = parent.backgroundOperations;
         connectionStateManager = parent.connectionStateManager;
+        connectionState = parent.connectionState;
         defaultData = parent.defaultData;
         failedDeleteManager = parent.failedDeleteManager;
         compressionProvider = parent.compressionProvider;
@@ -869,4 +889,53 @@ public class CuratorFrameworkImpl implements CuratorFramework
             }
         );
     }
+    
+    public ConnectionState getCurrentConnectionState()
+    {
+    	return connectionState.get();
+    }
+    
+    @Override
+    public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
+    {
+    	//Check if we're already connected
+    	ConnectionState currentConnectionState = connectionState.get();
+    	if(currentConnectionState != null && currentConnectionState.isConnected())
+    	{
+    		return true;
+    	}
+    	
+    	long startTime = System.currentTimeMillis();
+    	long maxWaitTimeMS = TimeUnit.MILLISECONDS.convert(maxWaitTime, units);
+    	
+    	for(;;)
+    	{
+    		synchronized(connectionState)
+    		{
+    			currentConnectionState = connectionState.get();
+    	    	if(currentConnectionState != null && currentConnectionState.isConnected())
+    			{
+    				return true;
+    			}
+    			
+    			long waitTime = 0;
+    			if(maxWaitTime > 0)
+    			{
+    				waitTime = maxWaitTimeMS  - (System.currentTimeMillis() - startTime);
+    	   			
+    				//Timeout
+        			if(waitTime <= 0)
+        			{
+        				return false;
+        			}    					
+    			}
+    		
+    			connectionState.wait(waitTime);
+    		}
+    	}
+    }
+    
+    public void blockUntilConnected() throws InterruptedException {
+    	blockUntilConnected(0, TimeUnit.SECONDS);
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
----------------------------------------------------------------------
diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
new file mode 100644
index 0000000..996e5fc
--- /dev/null
+++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestBlockUntilConnected.java
@@ -0,0 +1,217 @@
+package org.apache.curator.framework.imps;
+
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.test.BaseClassForTests;
+import org.apache.curator.test.TestingServer;
+import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class TestBlockUntilConnected extends BaseClassForTests
+{
+	/**
+	 * Test the case where we're already connected
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyConnected()
+	{		
+		Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	final CountDownLatch connectedLatch = new CountDownLatch(1);
+        	client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        	{
+				
+				@Override
+				public void stateChanged(CuratorFramework client, ConnectionState newState)
+				{
+					if(newState.isConnected())
+					{
+						connectedLatch.countDown();
+					}
+				}
+			});
+        	
+        	client.start();
+        	
+        	Assert.assertTrue(timing.awaitLatch(connectedLatch), "Timed out awaiting latch");
+        	Assert.assertTrue(client.blockUntilConnected(1, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected and never have been
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyNeverConnected()
+	{		
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected, but have been previously
+	 */
+	@Test
+	public void testBlockUntilConnectedCurrentlyAwaitingReconnect()
+	{		
+		Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        final CountDownLatch lostLatch = new CountDownLatch(1);
+        client.getConnectionStateListenable().addListener(new ConnectionStateListener()
+        {
+			
+			@Override
+			public void stateChanged(CuratorFramework client, ConnectionState newState)
+			{
+				if(newState == ConnectionState.LOST)
+				{
+					lostLatch.countDown();
+				}
+			}
+		});
+        
+        try
+        {
+        	client.start();
+        	
+        	//Block until we're connected
+        	Assert.assertTrue(client.blockUntilConnected(5,  TimeUnit.SECONDS), "Failed to connect");
+        	
+        	//Kill the server
+        	CloseableUtils.closeQuietly(server);
+        	
+        	//Wait until we hit the lost state
+        	Assert.assertTrue(timing.awaitLatch(lostLatch), "Failed to reach LOST state");
+        	
+        	server = new TestingServer(server.getPort(), server.getTempDirectory());
+        	
+        	Assert.assertTrue(client.blockUntilConnected(5, TimeUnit.SECONDS), "Not connected");
+        }
+        catch(Exception e)
+        {
+        	Assert.fail("Unexpected exception " + e);
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}	
+	
+	/**
+	 * Test the case where we are not currently connected and time out before a
+	 * connection becomes available.
+	 */
+	@Test
+	public void testBlockUntilConnectedConnectTimeout()
+	{	
+		//Kill the server
+		CloseableUtils.closeQuietly(server);
+		
+        CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	Assert.assertFalse(client.blockUntilConnected(5, TimeUnit.SECONDS),
+        					   "Connected");
+        }
+        catch(InterruptedException e)
+        {
+        	Assert.fail("Unexpected interruption");
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}
+	
+	/**
+	 * Test the case where we are not currently connected and the thread gets interrupted
+	 * prior to a connection becoming available
+	 */
+	@Test
+	public void testBlockUntilConnectedInterrupt()
+	{	
+		//Kill the server
+		CloseableUtils.closeQuietly(server);
+		
+        final CuratorFramework client = CuratorFrameworkFactory.builder().
+                connectString(server.getConnectString()).
+                retryPolicy(new RetryOneTime(1)).
+                build();
+        
+        try
+        {
+        	client.start();
+        	
+        	final Thread threadToInterrupt = Thread.currentThread();
+        	
+        	Timer timer = new Timer();
+        	timer.schedule(new TimerTask() {
+				
+				@Override
+				public void run() {
+					threadToInterrupt.interrupt();
+				}
+			}, 3000);
+        	
+        	client.blockUntilConnected(5, TimeUnit.SECONDS);
+        	Assert.fail("Expected interruption did not occur");
+        }
+        catch(InterruptedException e)
+        {
+        	//This is expected
+        }
+        finally
+        {
+        	CloseableUtils.closeQuietly(client);
+        }
+	}	
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index c1157c9..21e8cca 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -22,6 +22,7 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
@@ -31,6 +32,7 @@ import org.apache.curator.framework.recipes.locks.LockInternalsSorter;
 import org.apache.curator.framework.recipes.locks.StandardLockInternalsDriver;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -38,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.io.Closeable;
 import java.io.EOFException;
 import java.io.IOException;
@@ -45,6 +48,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -144,6 +148,14 @@ public class LeaderLatch implements Closeable
         this.id = Preconditions.checkNotNull(id, "id cannot be null");
         this.closeMode = Preconditions.checkNotNull(closeMode, "closeMode cannot be null");
     }
+    
+    private CountDownLatch startLatch;
+    
+    public LeaderLatch(CuratorFramework client, String latchPath,
+    		CountDownLatch startLatch) {
+    	this(client, latchPath);
+        this.startLatch = startLatch;
+    }
 
     /**
      * Add this instance to the leadership election and attempt to acquire leadership.
@@ -154,8 +166,30 @@ public class LeaderLatch implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        client.getConnectionStateListenable().addListener(listener);
-        reset();
+        //Block until connected
+        final ExecutorService executor = ThreadUtils.newSingleThreadExecutor("");
+        executor.submit(new Runnable()
+        {
+			
+			@Override
+			public void run()
+			{
+		        try
+		        {
+		        	client.blockUntilConnected();
+			        
+			        client.getConnectionStateListenable().addListener(listener);		        	
+		        	reset();
+		        }
+		        catch(Exception ex)
+		        {
+		        	log.error("An error occurred checking resetting leadership.", ex);
+		        } finally {
+		        	//Shutdown the executor
+		        	executor.shutdown();
+		        }
+			}
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/curator/blob/63d0401c/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index d3b00bc..35d8809 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -22,10 +22,12 @@ package org.apache.curator.framework.recipes.leader;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.test.TestingServer;
@@ -33,6 +35,7 @@ import org.apache.curator.test.Timing;
 import org.apache.curator.utils.CloseableUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
+
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Callable;
@@ -42,6 +45,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class TestLeaderLatch extends BaseClassForTests
@@ -531,57 +535,45 @@ public class TestLeaderLatch extends BaseClassForTests
         Timing timing = new Timing();
         CuratorFramework client = CuratorFrameworkFactory.newClient(
                 server.getConnectString(), timing.session(),
-                timing.connection(), new RetryOneTime(1));
+                timing.connection(), new RetryNTimes(5, 1000));
 
         client.start();
 
-        final CountDownLatch sessionLostCount = new CountDownLatch(1);
-
-        // Need to ensure that we've actually lost the connection completely
-        // before starting. Otherwise it's possible that the server will start
-        // during retries of the inital commands to create the latch zNodes
-        client.getConnectionStateListenable().addListener(
-                new ConnectionStateListener()
-                {
-                    @Override
-                    public void stateChanged(CuratorFramework client,
-                            ConnectionState newState)
-                    {
-                        if (newState == ConnectionState.LOST)
-                        {
-                            sessionLostCount.countDown();
-                        }
-                    }
-                });
-
         final LeaderLatch leader = new LeaderLatch(client, PATH_NAME);
         final CountDownLatch leaderCounter = new CountDownLatch(1);
+        final AtomicInteger leaderCount = new AtomicInteger(0);
+        final AtomicInteger notLeaderCount = new AtomicInteger(0);        
         leader.addListener(new LeaderLatchListener()
         {
             @Override
             public void isLeader()
             {
                 leaderCounter.countDown();
+                leaderCount.incrementAndGet();
             }
 
             @Override
             public void notLeader()
             {
+            	notLeaderCount.incrementAndGet();
             }
 
         });
 
         try
         {
-            leader.start();
-
-            timing.awaitLatch(sessionLostCount);
+        	leader.start();
+         
+        	//Wait for a while before starting the test server
+        	Thread.sleep(5000);
 
             // Start the new server
             server = new TestingServer(server.getPort());
 
-            timing.awaitLatch(leaderCounter);
-
+            Assert.assertTrue(timing.awaitLatch(leaderCounter), "Not elected leader");
+            
+            Assert.assertEquals(leaderCount.get(), 1, "Elected too many times");
+            Assert.assertEquals(notLeaderCount.get(), 0, "Unelected too many times");            
         }
         catch (Exception e)
         {
@@ -593,7 +585,6 @@ public class TestLeaderLatch extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
             CloseableUtils.closeQuietly(server);
         }
-
     }    
 
     private enum Mode