You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/04/22 01:07:28 UTC
[31/50] curator git commit: ChildReaper now creates a leaderLatch
itself if a leader path is provided and does no work (such as passing paths
to its Reaper) if it is not currently the leader.
ChildReaper now creates a leaderLatch itself if a leader path is provided and does no work (such as passing paths to its Reaper) if it is not currently the leader.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/49eb02a0
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/49eb02a0
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/49eb02a0
Branch: refs/heads/CURATOR-154
Commit: 49eb02a04f377a3b9e2da3b3904311ddddf1aa9d
Parents: 9da7960
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 13:50:39 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 13:50:39 2015 -0500
----------------------------------------------------------------------
.../framework/recipes/locks/ChildReaper.java | 56 ++++++++++++++------
.../recipes/locks/TestChildReaper.java | 47 ++++++++++++++++
2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 6e0a7e4..56c56ab 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -20,6 +20,8 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
+
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.utils.CloseableScheduledExecutorService;
@@ -52,6 +54,7 @@ public class ChildReaper implements Closeable
private final Reaper.Mode mode;
private final CloseableScheduledExecutorService executor;
private final int reapingThresholdMs;
+ private final LeaderLatch leaderLatch;
private volatile Future<?> task;
@@ -109,7 +112,15 @@ public class ChildReaper implements Closeable
this.mode = mode;
this.executor = new CloseableScheduledExecutorService(executor);
this.reapingThresholdMs = reapingThresholdMs;
- this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
+ if (leaderPath != null)
+ {
+ leaderLatch = new LeaderLatch(client, leaderPath);
+ }
+ else
+ {
+ leaderLatch = null;
+ }
+ this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
addPath(path);
}
@@ -136,7 +147,10 @@ public class ChildReaper implements Closeable
reapingThresholdMs,
TimeUnit.MILLISECONDS
);
-
+ if (leaderLatch != null)
+ {
+ leaderLatch.start();
+ }
reaper.start();
}
@@ -146,6 +160,10 @@ public class ChildReaper implements Closeable
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
CloseableUtils.closeQuietly(reaper);
+ if (leaderLatch != null)
+ {
+ CloseableUtils.closeQuietly(leaderLatch);
+ }
task.cancel(true);
}
}
@@ -173,32 +191,40 @@ public class ChildReaper implements Closeable
return paths.remove(PathUtils.validatePath(path));
}
- private static ScheduledExecutorService newExecutorService()
+ public static ScheduledExecutorService newExecutorService()
{
return ThreadUtils.newFixedThreadScheduledPool(2, "ChildReaper");
}
private void doWork()
{
- for ( String path : paths )
+ if (shouldDoWork())
{
- try
+ for ( String path : paths )
{
- List<String> children = client.getChildren().forPath(path);
- for ( String name : children )
+ try
{
- String thisPath = ZKPaths.makePath(path, name);
- Stat stat = client.checkExists().forPath(thisPath);
- if ( (stat != null) && (stat.getNumChildren() == 0) )
+ List<String> children = client.getChildren().forPath(path);
+ for ( String name : children )
{
- reaper.addPath(thisPath, mode);
+ String thisPath = ZKPaths.makePath(path, name);
+ Stat stat = client.checkExists().forPath(thisPath);
+ if ( (stat != null) && (stat.getNumChildren() == 0) )
+ {
+ reaper.addPath(thisPath, mode);
+ }
}
}
- }
- catch ( Exception e )
- {
- log.error("Could not get children for path: " + path, e);
+ catch ( Exception e )
+ {
+ log.error("Could not get children for path: " + path, e);
+ }
}
}
}
+
+ private boolean shouldDoWork()
+ {
+ return this.leaderLatch == null || this.leaderLatch.hasLeadership();
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/49eb02a0/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
index ad6ba6c..d81bb3a 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestChildReaper.java
@@ -18,6 +18,7 @@
*/
package org.apache.curator.framework.recipes.locks;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
import org.apache.curator.test.BaseClassForTests;
import org.apache.curator.utils.CloseableUtils;
import org.apache.curator.framework.CuratorFramework;
@@ -100,6 +101,52 @@ public class TestChildReaper extends BaseClassForTests
}
@Test
+ public void testLeaderElection() throws Exception
+ {
+ Timing timing = new Timing();
+ ChildReaper reaper = null;
+ CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+ LeaderLatch otherLeader = null;
+ try
+ {
+ client.start();
+
+ for ( int i = 0; i < 10; ++i )
+ {
+ client.create().creatingParentsIfNeeded().forPath("/test/" + Integer.toString(i));
+ }
+
+ otherLeader = new LeaderLatch(client, "/test-leader");
+ otherLeader.start();
+
+ reaper = new ChildReaper(client, "/test", Reaper.Mode.REAP_UNTIL_DELETE, ChildReaper.newExecutorService(), 1, "/test-leader");
+ reaper.start();
+
+ timing.forWaiting().sleepABit();
+
+ //Should not have reaped anything at this point since otherLeader is still leader
+ Stat stat = client.checkExists().forPath("/test");
+ Assert.assertEquals(stat.getNumChildren(), 10);
+
+ CloseableUtils.closeQuietly(otherLeader);
+
+ timing.forWaiting().sleepABit();
+
+ stat = client.checkExists().forPath("/test");
+ Assert.assertEquals(stat.getNumChildren(), 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper);
+ if (otherLeader != null && otherLeader.getState() == LeaderLatch.State.STARTED)
+ {
+ CloseableUtils.closeQuietly(otherLeader);
+ }
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
public void testMultiPath() throws Exception
{
Timing timing = new Timing();