You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2014/04/02 20:17:42 UTC
[1/2] git commit: Added optional leader selector support
Repository: curator
Updated Branches:
refs/heads/CURATOR-76 [created] 2ef7181ea
Added optional leader selector support
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/aed3c3b4
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/aed3c3b4
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/aed3c3b4
Branch: refs/heads/CURATOR-76
Commit: aed3c3b4acb90b7ad5ebce50ae482bef2d52b793
Parents: d19aff7
Author: randgalt <ra...@apache.org>
Authored: Wed Apr 2 13:15:31 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Apr 2 13:15:31 2014 -0500
----------------------------------------------------------------------
.../curator/framework/recipes/locks/Reaper.java | 96 +++++++--
.../framework/recipes/locks/TestReaper.java | 203 ++++++++++++-------
2 files changed, 211 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/aed3c3b4/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
index 037eacd..8802372 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/Reaper.java
@@ -21,8 +21,9 @@ package org.apache.curator.framework.recipes.locks;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.recipes.leader.LeaderLatch;
+import org.apache.curator.framework.recipes.leader.LeaderLatchListener;
import org.apache.curator.utils.CloseableScheduledExecutorService;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
@@ -31,10 +32,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.IOException;
-import java.util.Set;
+import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -46,8 +48,10 @@ public class Reaper implements Closeable
private final CuratorFramework client;
private final CloseableScheduledExecutorService executor;
private final int reapingThresholdMs;
- private final Set<String> activePaths = Sets.newSetFromMap(Maps.<String, Boolean>newConcurrentMap());
+ private final Map<String, PathHolder> activePaths = Maps.newConcurrentMap();
private final AtomicReference<State> state = new AtomicReference<State>(State.LATENT);
+ private final LeaderLatch leaderLatch;
+ private final AtomicBoolean reapingIsActive = new AtomicBoolean(true);
private enum State
{
@@ -107,7 +111,7 @@ public class Reaper implements Closeable
*/
public Reaper(CuratorFramework client)
{
- this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS);
+ this(client, newExecutorService(), DEFAULT_REAPING_THRESHOLD_MS, null);
}
/**
@@ -118,7 +122,7 @@ public class Reaper implements Closeable
*/
public Reaper(CuratorFramework client, int reapingThresholdMs)
{
- this(client, newExecutorService(), reapingThresholdMs);
+ this(client, newExecutorService(), reapingThresholdMs, null);
}
/**
@@ -128,9 +132,27 @@ public class Reaper implements Closeable
*/
public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs)
{
+ this(client, executor, reapingThresholdMs, null);
+ }
+
+ /**
+ * @param client client
+ * @param executor thread pool
+ * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+ * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
+ */
+ public Reaper(CuratorFramework client, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
+ {
this.client = client;
this.executor = new CloseableScheduledExecutorService(executor);
this.reapingThresholdMs = reapingThresholdMs / EMPTY_COUNT_THRESHOLD;
+
+ LeaderLatch localLeaderLatch = null;
+ if ( leaderPath != null )
+ {
+ localLeaderLatch = makeLeaderLatch(client, leaderPath);
+ }
+ leaderLatch = localLeaderLatch;
}
/**
@@ -153,8 +175,9 @@ public class Reaper implements Closeable
*/
public void addPath(String path, Mode mode)
{
- activePaths.add(path);
- schedule(new PathHolder(path, mode, 0), reapingThresholdMs);
+ PathHolder pathHolder = new PathHolder(path, mode, 0);
+ activePaths.put(path, pathHolder);
+ schedule(pathHolder, reapingThresholdMs);
}
/**
@@ -165,7 +188,7 @@ public class Reaper implements Closeable
*/
public boolean removePath(String path)
{
- return activePaths.remove(path);
+ return activePaths.remove(path) != null;
}
/**
@@ -176,6 +199,11 @@ public class Reaper implements Closeable
public void start() throws Exception
{
Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
+
+ if ( leaderLatch != null )
+ {
+ leaderLatch.start();
+ }
}
@Override
@@ -184,18 +212,27 @@ public class Reaper implements Closeable
if ( state.compareAndSet(State.STARTED, State.CLOSED) )
{
executor.close();
+ if ( leaderLatch != null )
+ {
+ leaderLatch.close();
+ }
}
}
@VisibleForTesting
protected Future<?> schedule(PathHolder pathHolder, int reapingThresholdMs)
{
- return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+ if ( reapingIsActive.get() )
+ {
+ return executor.schedule(pathHolder, reapingThresholdMs, TimeUnit.MILLISECONDS);
+ }
+ return null;
}
- private void reap(PathHolder holder)
+ @VisibleForTesting
+ protected void reap(PathHolder holder)
{
- if ( !activePaths.contains(holder.path) )
+ if ( !activePaths.containsKey(holder.path) )
{
return;
}
@@ -256,14 +293,47 @@ public class Reaper implements Closeable
{
activePaths.remove(holder.path);
}
- else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.contains(holder.path) )
+ else if ( !Thread.currentThread().isInterrupted() && (state.get() == State.STARTED) && activePaths.containsKey(holder.path) )
{
+ activePaths.put(holder.path, holder);
schedule(new PathHolder(holder.path, holder.mode, newEmptyCount), reapingThresholdMs);
}
}
- private static ScheduledExecutorService newExecutorService()
+ /**
+ * Allocate an executor service for the reaper
+ *
+ * @return service
+ */
+ public static ScheduledExecutorService newExecutorService()
{
return ThreadUtils.newSingleThreadScheduledExecutor("Reaper");
}
+
+ private LeaderLatch makeLeaderLatch(CuratorFramework client, String leaderPath)
+ {
+ reapingIsActive.set(false);
+
+ LeaderLatch localLeaderLatch = new LeaderLatch(client, leaderPath);
+ LeaderLatchListener listener = new LeaderLatchListener()
+ {
+ @Override
+ public void isLeader()
+ {
+ reapingIsActive.set(true);
+ for ( PathHolder holder : activePaths.values() )
+ {
+ schedule(holder, reapingThresholdMs);
+ }
+ }
+
+ @Override
+ public void notLeader()
+ {
+ reapingIsActive.set(false);
+ }
+ };
+ localLeaderLatch.addListener(listener);
+ return localLeaderLatch;
+ }
}
http://git-wip-us.apache.org/repos/asf/curator/blob/aed3c3b4/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
index b11bc8b..39c4817 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestReaper.java
@@ -16,9 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
+
package org.apache.curator.framework.recipes.locks;
-import org.apache.curator.utils.CloseableUtils;
import junit.framework.Assert;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
@@ -28,6 +28,7 @@ import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.Timing;
+import org.apache.curator.utils.CloseableUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.testng.annotations.Test;
@@ -42,21 +43,92 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
public class TestReaper extends BaseClassForTests
{
@Test
public void testUsingLeader() throws Exception
{
- final Timing timing = new Timing();
- final CuratorFramework client = makeClient(timing, null);
- final CountDownLatch latch = new CountDownLatch(1);
- LeaderSelectorListener listener = new LeaderSelectorListener()
+ final Timing timing = new Timing();
+ CuratorFramework client = makeClient(timing, null);
+ Reaper reaper1 = null;
+ Reaper reaper2 = null;
+ try
+ {
+ final AtomicInteger reaper1Count = new AtomicInteger();
+ reaper1 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
+ {
+ @Override
+ protected void reap(PathHolder holder)
+ {
+ reaper1Count.incrementAndGet();
+ super.reap(holder);
+ }
+ };
+
+ final AtomicInteger reaper2Count = new AtomicInteger();
+ reaper2 = new Reaper(client, Reaper.newExecutorService(), 1, "/reaper/leader")
+ {
+ @Override
+ protected void reap(PathHolder holder)
+ {
+ reaper2Count.incrementAndGet();
+ super.reap(holder);
+ }
+ };
+
+ client.start();
+ client.create().creatingParentsIfNeeded().forPath("/one/two/three");
+
+ reaper1.start();
+ reaper2.start();
+
+ reaper1.addPath("/one/two/three");
+ reaper2.addPath("/one/two/three");
+
+ timing.sleepABit();
+
+ Assert.assertTrue((reaper1Count.get() == 0) || (reaper2Count.get() == 0));
+ Assert.assertTrue((reaper1Count.get() > 0) || (reaper2Count.get() > 0));
+
+ Reaper activeReaper;
+ AtomicInteger inActiveReaperCount;
+ if ( reaper1Count.get() > 0 )
+ {
+ activeReaper = reaper1;
+ inActiveReaperCount = reaper2Count;
+ }
+ else
+ {
+ activeReaper = reaper2;
+ inActiveReaperCount = reaper1Count;
+ }
+ Assert.assertEquals(inActiveReaperCount.get(), 0);
+ activeReaper.close();
+ timing.sleepABit();
+ Assert.assertTrue(inActiveReaperCount.get() > 0);
+ }
+ finally
+ {
+ CloseableUtils.closeQuietly(reaper1);
+ CloseableUtils.closeQuietly(reaper2);
+ CloseableUtils.closeQuietly(client);
+ }
+ }
+
+ @Test
+ public void testUsingManualLeader() throws Exception
+ {
+ final Timing timing = new Timing();
+ final CuratorFramework client = makeClient(timing, null);
+ final CountDownLatch latch = new CountDownLatch(1);
+ LeaderSelectorListener listener = new LeaderSelectorListener()
{
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
- Reaper reaper = new Reaper(client, 1);
+ Reaper reaper = new Reaper(client, 1);
try
{
reaper.addPath("/one/two/three", Reaper.Mode.REAP_UNTIL_DELETE);
@@ -76,7 +148,7 @@ public class TestReaper extends BaseClassForTests
{
}
};
- LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
+ LeaderSelector selector = new LeaderSelector(client, "/leader", listener);
try
{
client.start();
@@ -99,12 +171,11 @@ public class TestReaper extends BaseClassForTests
@Test
public void testSparseUseNoReap() throws Exception
{
- final int THRESHOLD = 3000;
+ final int THRESHOLD = 3000;
- Timing timing = new Timing();
- Reaper reaper = null;
- Future<Void> watcher = null;
- CuratorFramework client = makeClient(timing, null);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = makeClient(timing, null);
try
{
client.start();
@@ -112,43 +183,36 @@ public class TestReaper extends BaseClassForTests
Assert.assertNotNull(client.checkExists().forPath("/one/two/three"));
- final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
- final ExecutorService pool = Executors.newCachedThreadPool();
+ final Queue<Reaper.PathHolder> holders = new ConcurrentLinkedQueue<Reaper.PathHolder>();
+ final ExecutorService pool = Executors.newCachedThreadPool();
ScheduledExecutorService service = new ScheduledThreadPoolExecutor(1);
- reaper = new Reaper
- (
- client,
- service,
- THRESHOLD
- )
+ reaper = new Reaper(client, service, THRESHOLD)
{
@Override
protected Future<Void> schedule(final PathHolder pathHolder, int reapingThresholdMs)
{
holders.add(pathHolder);
- final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
- pool.submit
- (
- new Callable<Void>()
+ final Future<?> f = super.schedule(pathHolder, reapingThresholdMs);
+ pool.submit(new Callable<Void>()
+ {
+ @Override
+ public Void call() throws Exception
{
- @Override
- public Void call() throws Exception
- {
- f.get();
- holders.remove(pathHolder);
- return null;
- }
+ f.get();
+ holders.remove(pathHolder);
+ return null;
}
- );
+ }
+ );
return null;
}
};
reaper.start();
reaper.addPath("/one/two/three");
- long start = System.currentTimeMillis();
- boolean emptyCountIsCorrect = false;
+ long start = System.currentTimeMillis();
+ boolean emptyCountIsCorrect = false;
while ( ((System.currentTimeMillis() - start) < timing.forWaiting().milliseconds()) && !emptyCountIsCorrect ) // need to loop as the Holder can go in/out of the Reaper's DelayQueue
{
for ( Reaper.PathHolder holder : holders )
@@ -176,10 +240,6 @@ public class TestReaper extends BaseClassForTests
}
finally
{
- if ( watcher != null )
- {
- watcher.cancel(true);
- }
CloseableUtils.closeQuietly(reaper);
CloseableUtils.closeQuietly(client);
}
@@ -197,7 +257,7 @@ public class TestReaper extends BaseClassForTests
testReapUntilDelete("test");
}
- @Test
+ @Test
public void testReapUntilGone() throws Exception
{
testReapUntilGone(null);
@@ -259,9 +319,9 @@ public class TestReaper extends BaseClassForTests
private void testReapUntilDelete(String namespace) throws Exception
{
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
@@ -290,13 +350,13 @@ public class TestReaper extends BaseClassForTests
private void testReapUntilGone(String namespace) throws Exception
{
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
-
+
reaper = new Reaper(client, 100);
reaper.start();
@@ -318,14 +378,9 @@ public class TestReaper extends BaseClassForTests
}
}
-
private CuratorFramework makeClient(Timing timing, String namespace) throws IOException
{
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
- .connectionTimeoutMs(timing.connection())
- .sessionTimeoutMs(timing.session())
- .connectString(server.getConnectString())
- .retryPolicy(new RetryOneTime(1));
+ CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectionTimeoutMs(timing.connection()).sessionTimeoutMs(timing.session()).connectString(server.getConnectString()).retryPolicy(new RetryOneTime(1));
if ( namespace != null )
{
builder = builder.namespace(namespace);
@@ -335,9 +390,9 @@ public class TestReaper extends BaseClassForTests
private void testRemove(String namespace) throws Exception
{
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
@@ -368,16 +423,16 @@ public class TestReaper extends BaseClassForTests
private void testSimulationWithLocks(String namespace) throws Exception
{
- final int LOCK_CLIENTS = 10;
- final int ITERATIONS = 250;
- final int MAX_WAIT_MS = 10;
+ final int LOCK_CLIENTS = 10;
+ final int ITERATIONS = 250;
+ final int MAX_WAIT_MS = 10;
- ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
- ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
+ ExecutorService service = Executors.newFixedThreadPool(LOCK_CLIENTS);
+ ExecutorCompletionService<Object> completionService = new ExecutorCompletionService<Object>(service);
- Timing timing = new Timing();
- Reaper reaper = null;
- final CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ final CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
@@ -388,14 +443,12 @@ public class TestReaper extends BaseClassForTests
for ( int i = 0; i < LOCK_CLIENTS; ++i )
{
- completionService.submit
- (
- new Callable<Object>()
+ completionService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
- final InterProcessMutex lock = new InterProcessMutex(client, "/a/b");
+ final InterProcessMutex lock = new InterProcessMutex(client, "/a/b");
for ( int i = 0; i < ITERATIONS; ++i )
{
lock.acquire();
@@ -411,7 +464,7 @@ public class TestReaper extends BaseClassForTests
return null;
}
}
- );
+ );
}
for ( int i = 0; i < LOCK_CLIENTS; ++i )
@@ -435,10 +488,10 @@ public class TestReaper extends BaseClassForTests
private void testWithEphemerals(String namespace) throws Exception
{
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client2 = null;
- CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client2 = null;
+ CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
@@ -479,9 +532,9 @@ public class TestReaper extends BaseClassForTests
private void testBasic(String namespace) throws Exception
{
- Timing timing = new Timing();
- Reaper reaper = null;
- CuratorFramework client = makeClient(timing, namespace);
+ Timing timing = new Timing();
+ Reaper reaper = null;
+ CuratorFramework client = makeClient(timing, namespace);
try
{
client.start();
[2/2] git commit: Expose leader path to ChildReaper
Posted by ra...@apache.org.
Expose leader path to ChildReaper
Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/2ef7181e
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/2ef7181e
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/2ef7181e
Branch: refs/heads/CURATOR-76
Commit: 2ef7181ea824e0908fa42e1fe22d7a1b9a31e954
Parents: aed3c3b
Author: randgalt <ra...@apache.org>
Authored: Wed Apr 2 13:17:28 2014 -0500
Committer: randgalt <ra...@apache.org>
Committed: Wed Apr 2 13:17:28 2014 -0500
----------------------------------------------------------------------
.../framework/recipes/locks/ChildReaper.java | 19 ++++++++++++++++---
1 file changed, 16 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/curator/blob/2ef7181e/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 0d9a53b..2035ccc 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -66,7 +66,7 @@ public class ChildReaper implements Closeable
*/
public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode)
{
- this(client, path, mode, newExecutorService(), Reaper.DEFAULT_REAPING_THRESHOLD_MS);
+ this(client, path, mode, newExecutorService(), Reaper.DEFAULT_REAPING_THRESHOLD_MS, null);
}
/**
@@ -77,7 +77,7 @@ public class ChildReaper implements Closeable
*/
public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, int reapingThresholdMs)
{
- this(client, path, mode, newExecutorService(), reapingThresholdMs);
+ this(client, path, mode, newExecutorService(), reapingThresholdMs, null);
}
/**
@@ -89,12 +89,25 @@ public class ChildReaper implements Closeable
*/
public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs)
{
+ this(client, path, mode, executor, reapingThresholdMs, null);
+ }
+
+ /**
+ * @param client the client
+ * @param path path to reap children from
+ * @param executor executor to use for background tasks
+ * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+ * @param mode reaping mode
+ * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
+ */
+ public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
+ {
this.client = client;
this.path = path;
this.mode = mode;
this.executor = new CloseableScheduledExecutorService(executor);
this.reapingThresholdMs = reapingThresholdMs;
- this.reaper = new Reaper(client, executor, reapingThresholdMs);
+ this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderPath);
}
/**