You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/04/22 01:07:28 UTC

[31/50] curator git commit: ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.

ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/49eb02a0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/49eb02a0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/49eb02a0

Branch: refs/heads/CURATOR-154
Commit: 49eb02a04f377a3b9e2da3b3904311ddddf1aa9d
Parents: 9da7960
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 13:50:39 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 13:50:39 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    | 56 ++++++++++++++------
 .../recipes/locks/TestChildReaper.java          | 47 ++++++++++++++++
 2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 6e0a7e4..56c56ab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -20,6 +20,8 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
+
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.utils.CloseableScheduledExecutorService;
@@ -52,6 +54,7 @@ public class ChildReaper implements Closeable
     private final Reaper.Mode mode;
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
+    private final LeaderLatch leaderLatch;
 
     private volatile Future<?> task;
 
@@ -109,7 +112,15 @@ public class ChildReaper implements Closeable
         this.mode = mode;
         this.executor = new CloseableScheduledExecutorService(executor);
         this.reapingThresholdMs = reapingThresholdMs;
-        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
+        if (leaderPath != null)
+        {
+            leaderLatch = new LeaderLatch(client, leaderPath);
+        }
+        else
+        {
+            leaderLatch = null;
+        }
+        this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
         addPath(path);
     }
 
@@ -136,7 +147,10 @@ public class ChildReaper implements Closeable
             reapingThresholdMs,
             TimeUnit.MILLISECONDS
         );
-
+        if (leaderLatch != null)
+        {
+            leaderLatch.start();
+        }
         reaper.start();
     }
 
@@ -146,6 +160,10 @@ public class ChildReaper implements Closeable
         if ( state.compareAndSet(State.STARTED, State.CLOSED) )
         {
             CloseableUtils.closeQuietly(reaper);
+            if (leaderLatch != null)
+            {
+                CloseableUtils.closeQuietly(leaderLatch);
+            }
             task.cancel(true);
         }
     }
@@ -173,32 +191,40 @@ public class ChildReaper implements Closeable
         return paths.remove(PathUtils.validatePath(path));
     }
 
-    private static ScheduledExecutorService newExecutorService()
+    public static ScheduledExecutorService newExecutorService()
     {
         return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
     }
 
     private void doWork()
     {
-        for ( String path : paths )
+        if (shouldDoWork())
         {
-            try
+            for ( String path : paths )
             {
-                List<String> children = client.getChildren().forPath(path);
-                for ( String name : children )
+                try
                 {
-                    String thisPath = ZKPaths.makePath(path, name);
-                    Stat stat = client.checkExists().forPath(thisPath);
-                    if ( (stat != null) && (stat.getNumChildren() == 0) )
+                    List<String> children = client.getChildren().forPath(path);
+                    for ( String name : children )
                     {
-                        reaper.addPath(thisPath, mode);
+                        String thisPath = ZKPaths.makePath(path, name);
+                        Stat stat = client.checkExists().forPath(thisPath);
+                        if ( (stat != null) && (stat.getNumChildren() == 0) )
+                        {
+                            reaper.addPath(thisPath, mode);
+                        }
                     }
                 }
-            }
-            catch ( Exception e )
-            {
-                log.error("Could not get children for path: " + path, e);
+                catch ( Exception e )
+                {
+                    log.error("Could not get children for path: " + path, e);
+                }
             }
         }
     }
+
+    private boolean shouldDoWork()
+    {
+        return this.leaderLatch == null || this.leaderLatch.hasLeadership();
+    }
 }

http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
index ad6ba6c..d81bb3a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.curator.framework.recipes.locks;
 
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
 import org.apache.curator.test.BaseClassForTests;
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.framework.CuratorFramework;
@@ -100,6 +101,52 @@ public class TestChildReaper extends BaseClassForTests
     }
 
     @Test
+    public void     testLeaderElection() throws Exception
+    {
+        Timing                  timing = new Timing();
+        ChildReaper             reaper = null;
+        CuratorFramework        client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        LeaderLatch otherLeader = null;
+        try
+        {
+            client.start();
+
+            for ( int i = 0; i < 10; ++i )
+            {
+                client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+            }
+
+            otherLeader = new LeaderLatch(client, "/test-leader");
+            otherLeader.start();
+
+            reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
+            reaper.start();
+
+            timing.forWaiting().sleepABit();
+
+            //Should not have reaped anything at this point since otherLeader is still leader
+            Stat    stat = client.checkExists().forPath("/test");
+            Assert.assertEquals(stat.getNumChildren(), 10);
+
+            CloseableUtils.closeQuietly(otherLeader);
+
+            timing.forWaiting().sleepABit();
+
+            stat = client.checkExists().forPath("/test");
+            Assert.assertEquals(stat.getNumChildren(), 0);
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(reaper);
+            if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED)
+            {
+                CloseableUtils.closeQuietly(otherLeader);
+            }
+            CloseableUtils.closeQuietly(client);
+        }
+    }
+
+    @Test
     public void     testMultiPath() throws Exception
     {
         Timing                  timing = new Timing();