You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2015/08/25 00:39:50 UTC

curator git commit: Updated LeaderLatch for error policy

Repository: curator
Updated Branches:
  refs/heads/CURATOR-248 94dff8a5a -> 5429a217b


Updated LeaderLatch for error policy


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/5429a217
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/5429a217
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/5429a217

Branch: refs/heads/CURATOR-248
Commit: 5429a217bb23901aaf2b187bb8c1d760d0a76bcc
Parents: 94dff8a
Author: randgalt <ra...@apache.org>
Authored: Mon Aug 24 17:39:41 2015 -0500
Committer: randgalt <ra...@apache.org>
Committed: Mon Aug 24 17:39:41 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/leader/LeaderLatch.java   |  39 +++--
 .../recipes/leader/TestLeaderLatch.java         | 162 +++++++++++++++++++
 2 files changed, 187 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
index da9b8b2..aa4dd9f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java
@@ -160,20 +160,20 @@ public class LeaderLatch implements Closeable
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
         startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
+        {
+            @Override
+            public void run()
+            {
+                try
                 {
-                    @Override
-                    public void run()
-                    {
-                        try
-                        {
-                            internalStart();
-                        }
-                        finally
-                        {
-                            startTask.set(null);
-                        }
-                    }
-                }));
+                    internalStart();
+                }
+                finally
+                {
+                    startTask.set(null);
+                }
+            }
+        }));
     }
 
     /**
@@ -604,7 +604,10 @@ public class LeaderLatch implements Closeable
             {
                 try
                 {
-                    reset();
+                    if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) || !hasLeadership.get() )
+                    {
+                        reset();
+                    }
                 }
                 catch ( Exception e )
                 {
@@ -615,6 +618,14 @@ public class LeaderLatch implements Closeable
             }
 
             case SUSPENDED:
+            {
+                if ( client.getErrorPolicy().isErrorState(ConnectionState.SUSPENDED) )
+                {
+                    setLeadership(false);
+                }
+                break;
+            }
+
             case LOST:
             {
                 setLeadership(false);

http://git-wip-us.apache.org/repos/asf/curator/blob/5429a217/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
index 3742fb7..bd73e9d 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/leader/TestLeaderLatch.java
@@ -21,12 +21,15 @@ package org.apache.curator.framework.recipes.leader;
 
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.imps.TestCleanState;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.framework.state.SessionErrorPolicy;
+import org.apache.curator.framework.state.StandardErrorPolicy;
 import org.apache.curator.retry.RetryNTimes;
 import org.apache.curator.retry.RetryOneTime;
 import org.apache.curator.test.BaseClassForTests;
@@ -37,11 +40,13 @@ import org.testng.Assert;
 import org.testng.annotations.Test;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +58,163 @@ public class TestLeaderLatch extends BaseClassForTests
     private static final int MAX_LOOPS = 5;
 
     @Test
+    public void testSessionErrorPolicy() throws Exception
+    {
+        Timing timing = new Timing();
+        LeaderLatch latch = null;
+        CuratorFramework client = null;
+        for ( int i = 0; i < 2; ++i )
+        {
+            boolean isSessionIteration = (i == 0);
+            try
+            {
+                client = CuratorFrameworkFactory.builder()
+                    .connectString(server.getConnectString())
+                    .connectionTimeoutMs(10000)
+                    .sessionTimeoutMs(60000)
+                    .retryPolicy(new RetryOneTime(1))
+                    .errorPolicy(isSessionIteration ? new SessionErrorPolicy() : new StandardErrorPolicy())
+                    .build();
+                final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+                ConnectionStateListener stateListener = new ConnectionStateListener()
+                {
+                    @Override
+                    public void stateChanged(CuratorFramework client, ConnectionState newState)
+                    {
+                        states.add(newState.name());
+                    }
+                };
+                client.getConnectionStateListenable().addListener(stateListener);
+                client.start();
+
+                latch = new LeaderLatch(client, "/test");
+                LeaderLatchListener listener = new LeaderLatchListener()
+                {
+                    @Override
+                    public void isLeader()
+                    {
+                        states.add("true");
+                    }
+
+                    @Override
+                    public void notLeader()
+                    {
+                        states.add("false");
+                    }
+                };
+                latch.addListener(listener);
+                latch.start();
+                Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+                Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                server.stop();
+                if ( isSessionIteration )
+                {
+                    Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+                    server.restart();
+                    Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
+                    Assert.assertNull(states.poll(timing.milliseconds(), TimeUnit.MILLISECONDS));
+                }
+                else
+                {
+                    String s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+                    Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
+                    s = states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS);
+                    Assert.assertTrue("false".equals(s) || ConnectionState.SUSPENDED.name().equals(s));
+                    server.restart();
+                    Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.RECONNECTED.name());
+                    Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+                }
+            }
+            finally
+            {
+                CloseableUtils.closeQuietly(latch);
+                CloseableUtils.closeQuietly(client);
+            }
+        }
+    }
+
+    @Test
+    public void testErrorPolicies() throws Exception
+    {
+        Timing timing = new Timing();
+        LeaderLatch latch = null;
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+            .connectString(server.getConnectString())
+            .connectionTimeoutMs(1000)
+            .sessionTimeoutMs(timing.session())
+            .retryPolicy(new RetryOneTime(1))
+            .errorPolicy(new StandardErrorPolicy())
+            .build();
+        try
+        {
+            final BlockingQueue<String> states = Queues.newLinkedBlockingQueue();
+            ConnectionStateListener stateListener = new ConnectionStateListener()
+            {
+                @Override
+                public void stateChanged(CuratorFramework client, ConnectionState newState)
+                {
+                    states.add(newState.name());
+                }
+            };
+            client.getConnectionStateListenable().addListener(stateListener);
+            client.start();
+            latch = new LeaderLatch(client, "/test");
+            LeaderLatchListener listener = new LeaderLatchListener()
+            {
+                @Override
+                public void isLeader()
+                {
+                    states.add("true");
+                }
+
+                @Override
+                public void notLeader()
+                {
+                    states.add("false");
+                }
+            };
+            latch.addListener(listener);
+            latch.start();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+            server.close();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false");
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+            latch.close();
+            client.close();
+
+            timing.sleepABit();
+            states.clear();
+
+            server = new TestingServer();
+            client = CuratorFrameworkFactory.builder()
+                .connectString(server.getConnectString())
+                .connectionTimeoutMs(1000)
+                .sessionTimeoutMs(timing.session())
+                .retryPolicy(new RetryOneTime(1))
+                .errorPolicy(new SessionErrorPolicy())
+                .build();
+            client.getConnectionStateListenable().addListener(stateListener);
+            client.start();
+            latch = new LeaderLatch(client, "/test");
+            latch.addListener(listener);
+            latch.start();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.CONNECTED.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "true");
+            server.close();
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.SUSPENDED.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), ConnectionState.LOST.name());
+            Assert.assertEquals(states.poll(timing.forWaiting().milliseconds(), TimeUnit.MILLISECONDS), "false");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(latch);
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testProperCloseWithoutConnectionEstablished() throws Exception
     {
         server.stop();