You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/02/10 01:43:27 UTC

[1/3] curator git commit: Adding a new constructor to Reaper so that it can optionally take a fully constructed leader latch that is owned by another class rather than create its own leader latch

Repository: curator
Updated Branches:
  refs/heads/CURATOR-187 [created] 49eb02a04


Adding a new constructor to Reaper so that it can optionally take a fully constructed leader latch that is owned by another class rather than create its own leader latch


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/520ae54a
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/520ae54a
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/520ae54a

Branch: refs/heads/CURATOR-187
Commit: 520ae54ac4a49292201417fa6b1104cf579704d3
Parents: febfcec
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 13:19:20 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 13:19:20 2015 -0500

----------------------------------------------------------------------
 .../curator/framework/recipes/locks/Reaper.java | 62 +++++++++++---
 .../framework/recipes/locks/TestReaper.java     | 90 +++++++++++++++++++-
 2 files changed, 137 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 8802372..660e3d3 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -52,6 +52,7 @@ public class Reaper implements Closeable
     private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
     private final LeaderLatch leaderLatch;
     private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
+    private final boolean ownsLeaderLatch;
 
     private enum State
     {
@@ -111,7 +112,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client)
     {
-        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
+        this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, (String) null);
     }
 
     /**
@@ -122,7 +123,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, int reapingThresholdMs)
     {
-        this(client, newExecutorService(), reapingThresholdMs, null);
+        this(client, newExecutorService(), reapingThresholdMs, (String) null);
     }
 
     /**
@@ -132,7 +133,7 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
     {
-        this(client, executor, reapingThresholdMs, null);
+        this(client, executor, reapingThresholdMs, (String) null);
     }
 
     /**
@@ -143,18 +144,41 @@ public class Reaper implements Closeable
      */
     public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
     {
+        this(client, executor, reapingThresholdMs, makeLeaderLatchIfPathNotNull(client, leaderPath), true);
+    }
+
+    /**
+     * @param client             client
+     * @param executor           thread pool
+     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
+     */
+    public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch)
+    {
+        this(client, executor, reapingThresholdMs, leaderLatch, false);
+    }
+
+    /**
+     * @param client             client
+     * @param executor           thread pool
+     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+     * @param leaderLatch        a pre-created leader latch to ensure only 1 reaper is active in the cluster
+     * @param ownsLeaderLatch    indicates whether or not the reaper owns the leader latch (if it exists) and thus should start/stop it
+     * */
+    private Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, LeaderLatch leaderLatch, boolean ownsLeaderLatch)
+    {
         this.client = client;
         this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
-
-        LeaderLatch localLeaderLatch = null;
-        if ( leaderPath != null )
+        this.leaderLatch = leaderLatch;
+        if (leaderLatch != null)
         {
-            localLeaderLatch = makeLeaderLatch(client, leaderPath);
+            addListenerToLeaderLatch(leaderLatch);
         }
-        leaderLatch = localLeaderLatch;
+        this.ownsLeaderLatch = ownsLeaderLatch;
     }
 
+
     /**
      * Add a path (using Mode.REAP_INDEFINITELY) to be checked by the reaper. The path will be checked periodically
      * until the reaper is closed.
@@ -200,7 +224,7 @@ public class Reaper implements Closeable
     {
         Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
 
-        if ( leaderLatch != null )
+        if ( leaderLatch != null && ownsLeaderLatch)
         {
             leaderLatch.start();
         }
@@ -212,7 +236,7 @@ public class Reaper implements Closeable
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
             executor.close();
-            if ( leaderLatch != null )
+            if ( leaderLatch != null && ownsLeaderLatch )
             {
                 leaderLatch.close();
             }
@@ -310,11 +334,10 @@ public class Reaper implements Closeable
         return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
     }
 
-    private LeaderLatch makeLeaderLatch(CuratorFramework client, String leaderPath)
+    private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
     {
         reapingIsActive.set(false);
 
-        LeaderLatch localLeaderLatch = new LeaderLatch(client, leaderPath);
         LeaderLatchListener listener = new LeaderLatchListener()
         {
             @Override
@@ -333,7 +356,18 @@ public class Reaper implements Closeable
                 reapingIsActive.set(false);
             }
         };
-        localLeaderLatch.addListener(listener);
-        return localLeaderLatch;
+        leaderLatch.addListener(listener);
+    }
+
+    private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)
+    {
+        if (leaderPath == null)
+        {
+            return null;
+        }
+        else
+        {
+            return new LeaderLatch(client, leaderPath);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/520ae54a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index 83ec960..c47808f 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -21,6 +21,7 @@ package org.apache.curator.framework.recipes.locks;
 
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.framework.recipes.leader.LeaderSelector;
 import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
 import org.apache.curator.framework.state.ConnectionState;
@@ -48,7 +49,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class TestReaper extends BaseClassForTests
 {
     @Test
-    public void testUsingLeader() throws Exception
+    public void testUsingLeaderPath() throws Exception
     {
         final Timing timing = new Timing();
         CuratorFramework client = makeClient(timing, null);
@@ -118,6 +119,93 @@ public class TestReaper extends BaseClassForTests
     }
 
     @Test
+    public void testUsingLeaderLatch() throws Exception
+    {
+        final Timing timing = new Timing();
+        CuratorFramework client = makeClient(timing, null);
+        Reaper reaper1 = null;
+        Reaper reaper2 = null;
+        LeaderLatch leaderLatch1 = null;
+        LeaderLatch leaderLatch2 = null;
+        try
+        {
+            final AtomicInteger reaper1Count = new AtomicInteger();
+            leaderLatch1 = new LeaderLatch(client, "/reaper/leader");
+            reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch1)
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper1Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            final AtomicInteger reaper2Count = new AtomicInteger();
+            leaderLatch2 = new LeaderLatch(client, "/reaper/leader");
+            reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, leaderLatch2)
+            {
+                @Override
+                protected void reap(PathHolder holder)
+                {
+                    reaper2Count.incrementAndGet();
+                    super.reap(holder);
+                }
+            };
+
+            client.start();
+            client.create().creatingParentsIfNeeded().forPath("/one/two/three");
+
+            leaderLatch1.start();
+            leaderLatch2.start();
+
+            reaper1.start();
+            reaper2.start();
+
+            reaper1.addPath("/one/two/three");
+            reaper2.addPath("/one/two/three");
+
+            timing.sleepABit();
+
+            Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
+            Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
+
+            Reaper activeReaper;
+            LeaderLatch activeLeaderLeatch;
+            AtomicInteger inActiveReaperCount;
+            if ( reaper1Count.get() > 0 )
+            {
+                activeReaper = reaper1;
+                activeLeaderLeatch = leaderLatch1;
+                inActiveReaperCount = reaper2Count;
+            }
+            else
+            {
+                activeReaper = reaper2;
+                activeLeaderLeatch = leaderLatch2;
+                inActiveReaperCount = reaper1Count;
+            }
+            Assert.assertEquals(inActiveReaperCount.get(), 0);
+            activeReaper.close();
+            activeLeaderLeatch.close();
+            timing.sleepABit();
+            Assert.assertTrue(inActiveReaperCount.get() > 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(reaper1);
+            CloseableUtils.closeQuietly(reaper2);
+            if (leaderLatch1 != null && LeaderLatch.State.STARTED == leaderLatch1.getState()) {
+                CloseableUtils.closeQuietly(leaderLatch1);
+            }
+            if (leaderLatch2 != null && LeaderLatch.State.STARTED == leaderLatch2.getState()) {
+                CloseableUtils.closeQuietly(leaderLatch2);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void testUsingManualLeader() throws Exception
     {
         final Timing timing = new Timing();


[2/3] curator git commit: Reaper now sets its initial reapingIsActive state based on the state of the leader latch in case the leader latch was already started before being passed into the reaper's constructor

Posted by ca...@apache.org.
Reaper now sets its initial reapingIsActive state based on the state of the leader latch in case the leader latch was already started before being passed into the reaper's constructor


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/9da79604
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/9da79604
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/9da79604

Branch: refs/heads/CURATOR-187
Commit: 9da79604bfe7e775d1d42a25f0ac7579b8ee0464
Parents: 520ae54
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 13:25:51 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 13:25:51 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/curator/framework/recipes/locks/Reaper.java   | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/9da79604/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 660e3d3..efd363f 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -336,7 +336,6 @@ public class Reaper implements Closeable
 
     private void addListenerToLeaderLatch(LeaderLatch leaderLatch)
     {
-        reapingIsActive.set(false);
 
         LeaderLatchListener listener = new LeaderLatchListener()
         {
@@ -357,6 +356,8 @@ public class Reaper implements Closeable
             }
         };
         leaderLatch.addListener(listener);
+
+        reapingIsActive.set(leaderLatch.hasLeadership());
     }
 
     private static LeaderLatch makeLeaderLatchIfPathNotNull(CuratorFramework client, String leaderPath)


[3/3] curator git commit: ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.

Posted by ca...@apache.org.
ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/49eb02a0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/49eb02a0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/49eb02a0

Branch: refs/heads/CURATOR-187
Commit: 49eb02a04f377a3b9e2da3b3904311ddddf1aa9d
Parents: 9da7960
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 13:50:39 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 13:50:39 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    | 56 ++++++++++++++------
 .../recipes/locks/TestChildReaper.java          | 47 ++++++++++++++++
 2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 6e0a7e4..56c56ab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -20,6 +20,8 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.CloseableScheduledExecutorService;
@@ -52,6 +54,7 @@ public class ChildReaper implements Closeable
     private final Reaper.Mode mode;
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
+    private final LeaderLatch leaderLatch;
 
     private volatile Future<?> task;
 
@@ -109,7 +112,15 @@ public class ChildReaper implements Closeable
         this.mode = mode;
         this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs;
-        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
+        if (leaderPath != null)
+        {
+            leaderLatch = new LeaderLatch(client, leaderPath);
+        }
+        else
+        {
+            leaderLatch = null;
+        }
+        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
         addPath(path);
     }
 
@@ -136,7 +147,10 @@ public class ChildReaper implements Closeable
             reapingThresholdMs,
             TimeUnit.MILLISECONDS
         );
-
+        if (leaderLatch != null)
+        {
+            leaderLatch.start();
+        }
         reaper.start();
     }
 
@@ -146,6 +160,10 @@ public class ChildReaper implements Closeable
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
             CloseableUtils.closeQuietly(reaper);
+            if (leaderLatch != null)
+            {
+                CloseableUtils.closeQuietly(leaderLatch);
+            }
             task.cancel(true);
         }
     }
@@ -173,32 +191,40 @@ public class ChildReaper implements Closeable
         return paths.remove(PathUtils.validatePath(path));
     }
 
-    private static ScheduledExecutorService newExecutorService()
+    public static ScheduledExecutorService newExecutorService()
     {
         return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
     }
 
     private void doWork()
     {
-        for ( String path : paths )
+        if (shouldDoWork())
         {
-            try
+            for ( String path : paths )
             {
-                List<String> children = client.getChildren().forPath(path);
-                for ( String name : children )
+                try
                 {
-                    String thisPath = ZKPaths.makePath(path, name);
-                    Stat stat = client.checkExists().forPath(thisPath);
-                    if ( (stat != null) && (stat.getNumChildren() == 0) )
+                    List<String> children = client.getChildren().forPath(path);
+                    for ( String name : children )
                     {
-                        reaper.addPath(thisPath, mode);
+                        String thisPath = ZKPaths.makePath(path, name);
+                        Stat stat = client.checkExists().forPath(thisPath);
+                        if ( (stat != null) && (stat.getNumChildren() == 0) )
+                        {
+                            reaper.addPath(thisPath, mode);
+                        }
                     }
                 }
-            }
-            catch ( Exception e )
-            {
-                log.error("Could not get children for path: " + path, e);
+                catch ( Exception e )
+                {
+                    log.error("Could not get children for path: " + path, e);
+                }
             }
         }
     }
+
+    private boolean shouldDoWork()
+    {
+        return this.leaderLatch == null || this.leaderLatch.hasLeadership();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
index ad6ba6c..d81bb3a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.locks;
 
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -100,6 +101,52 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
+    public void     testLeaderElection() throws Exception
+    {
+        Timing                  timing = new Timing();
+        ChildReaper             reaper = null;
+        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        LeaderLatch otherLeader = null;
+        try
+        {
+            client.start();
+
+            for ( int i = 0; i < 10; ++i )
+            {
+                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+            }
+
+            otherLeader = new LeaderLatch(client, "/test-leader");
+            otherLeader.start();
+
+            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
+            reaper.start();
+
+            timing.forWaiting().sleepABit();
+
+            //Should not have reaped anything at this point since otherLeader is still leader
+            Stat    stat = client.checkExists().forPath("/test");
+            Assert.assertEquals(stat.getNumChildren(), 10);
+
+            CloseableUtils.closeQuietly(otherLeader);
+
+            timing.forWaiting().sleepABit();
+
+            stat = client.checkExists().forPath("/test");
+            Assert.assertEquals(stat.getNumChildren(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(reaper);
+            if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED)
+            {
+                CloseableUtils.closeQuietly(otherLeader);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void     testMultiPath() throws Exception
     {
         Timing                  timing = new Timing();