You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ca...@apache.org on 2015/04/22 01:07:29 UTC

[32/50] curator git commit: Adding the notion of a 'lock schema' to ChildReaper that enables it to reap both the direct children its watching and subnodes of those children. This is necessary with InterProcessSemaphoreV2 as it creates multiple subnodes

Adding the notion of a 'lock schema' to ChildReaper that enables it to reap both the direct children its watching and subnodes of those children.  This is necessary with InterProcessSemaphoreV2 as it creates multiple subnodes beneath its lock nodes and otherwise is unreapable with ChildReaper


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/72aea4a3
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/72aea4a3
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/72aea4a3

Branch: refs/heads/CURATOR-154
Commit: 72aea4a30b36201fe2a673358c1e062d6b5109a7
Parents: 49eb02a
Author: David Kesler <dk...@yodle.com>
Authored: Mon Feb 9 16:34:20 2015 -0500
Committer: David Kesler <dk...@yodle.com>
Committed: Mon Feb 9 16:34:20 2015 -0500

----------------------------------------------------------------------
 .../framework/recipes/locks/ChildReaper.java    | 35 +++++++++++++++--
 .../recipes/locks/InterProcessSemaphoreV2.java  |  8 ++++
 .../framework/recipes/locks/LockSchema.java     | 22 +++++++++++
 .../locks/TestInterProcessSemaphore.java        | 40 ++++++++++++++++++++
 4 files changed, 101 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
index 56c56ab..7935f0b 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/ChildReaper.java
@@ -55,6 +55,7 @@ public class ChildReaper implements Closeable
     private final CloseableScheduledExecutorService executor;
     private final int reapingThresholdMs;
     private final LeaderLatch leaderLatch;
+    private final LockSchema lockSchema;
 
     private volatile Future<?> task;
 
@@ -108,6 +109,21 @@ public class ChildReaper implements Closeable
      */
     public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath)
     {
+        this(client, path, mode, executor, reapingThresholdMs, leaderPath, new LockSchema());
+    }
+
+
+    /**
+     * @param client the client
+     * @param path path to reap children from
+     * @param executor executor to use for background tasks
+     * @param reapingThresholdMs threshold in milliseconds that determines that a path can be deleted
+     * @param mode reaping mode
+     * @param leaderPath if not null, uses a leader selection so that only 1 reaper is active in the cluster
+     * @param lockSchema a set of the possible subnodes of the children of path that must be reaped in addition to the child nodes
+     */
+    public ChildReaper(CuratorFramework client, String path, Reaper.Mode mode, ScheduledExecutorService executor, int reapingThresholdMs, String leaderPath, LockSchema lockSchema)
+    {
         this.client = client;
         this.mode = mode;
         this.executor = new CloseableScheduledExecutorService(executor);
@@ -121,6 +137,7 @@ public class ChildReaper implements Closeable
             leaderLatch = null;
         }
         this.reaper = new Reaper(client, executor, reapingThresholdMs, leaderLatch);
+        this.lockSchema = lockSchema;
         addPath(path);
     }
 
@@ -207,12 +224,13 @@ public class ChildReaper implements Closeable
                     List<String> children = client.getChildren().forPath(path);
                     for ( String name : children )
                     {
-                        String thisPath = ZKPaths.makePath(path, name);
-                        Stat stat = client.checkExists().forPath(thisPath);
-                        if ( (stat != null) && (stat.getNumChildren() == 0) )
+                        String childPath = ZKPaths.makePath(path, name);
+                        addPathToReaperIfEmpty(childPath);
+                        for ( String subNode : lockSchema.getPaths() )
                         {
-                            reaper.addPath(thisPath, mode);
+                            addPathToReaperIfEmpty(ZKPaths.makePath(childPath, subNode));
                         }
+
                     }
                 }
                 catch ( Exception e )
@@ -223,6 +241,15 @@ public class ChildReaper implements Closeable
         }
     }
 
+    private void addPathToReaperIfEmpty(String path) throws Exception
+    {
+        Stat stat = client.checkExists().forPath(path);
+        if ( (stat != null) && (stat.getNumChildren() == 0) )
+        {
+            reaper.addPath(path, mode);
+        }
+    }
+
     private boolean shouldDoWork()
     {
         return this.leaderLatch == null || this.leaderLatch.hasLeadership();

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
index 2e14ee1..55647ad 100644
--- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/InterProcessSemaphoreV2.java
@@ -21,6 +21,8 @@ package org.apache.curator.framework.recipes.locks;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+
 import org.apache.curator.utils.CloseableUtils;
 import org.apache.curator.RetryLoop;
 import org.apache.curator.framework.CuratorFramework;
@@ -92,6 +94,12 @@ public class InterProcessSemaphoreV2
     private static final String LOCK_PARENT = "locks";
     private static final String LEASE_PARENT = "leases";
     private static final String LEASE_BASE_NAME = "lease-";
+    public static final LockSchema LOCK_SCHEMA = new LockSchema(
+            Sets.newHashSet(
+                    LOCK_PARENT,
+                    LEASE_PARENT
+            )
+    );
 
     /**
      * @param client    the client

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
new file mode 100644
index 0000000..5794705
--- /dev/null
+++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/locks/LockSchema.java
@@ -0,0 +1,22 @@
+package org.apache.curator.framework.recipes.locks;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import com.google.common.collect.Sets;
+
+public class LockSchema {
+    private final Set<String> paths;
+
+    public LockSchema() {
+        paths = new HashSet<String>();
+    }
+
+    public LockSchema(Set<String> paths) {
+        this.paths = Sets.newHashSet(paths);
+    }
+
+    public Set<String> getPaths() {
+        return Sets.newHashSet(paths);
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/72aea4a3/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
----------------------------------------------------------------------
diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
index dd3f98f..631b7c7 100644
--- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
+++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/locks/TestInterProcessSemaphore.java
@@ -531,4 +531,44 @@ public class TestInterProcessSemaphore extends BaseClassForTests
             CloseableUtils.closeQuietly(client);
         }
     }
+
+    @Test
+    public void testChildReaperCleansUpLockNodes() throws Exception
+    {
+        Timing timing = new Timing();
+        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
+        client.start();
+
+        ChildReaper childReaper = null;
+        try
+        {
+            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, "/test/lock", 1);
+            semaphore.returnLease(semaphore.acquire(timing.forWaiting().seconds(), TimeUnit.SECONDS));
+
+            Assert.assertTrue(client.getChildren().forPath("/test").size() > 0);
+
+            childReaper = new ChildReaper(
+                    client,
+                    "/test",
+                    Reaper.Mode.REAP_UNTIL_GONE,
+                    ChildReaper.newExecutorService(),
+                    1,
+                    "/test-leader",
+                    InterProcessSemaphoreV2.LOCK_SCHEMA
+            );
+            childReaper.start();
+
+            timing.forWaiting().sleepABit();
+
+            List<String> children = client.getChildren().forPath("/test");
+
+            Assert.assertEquals(children.size(), 0, "All children of /test should have been reaped");
+        }
+        finally
+        {
+            CloseableUtils.closeQuietly(childReaper);
+            CloseableUtils.closeQuietly(client);
+        }
+
+    }
 }