You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@curator.apache.org by ra...@apache.org on 2017/06/11 01:46:24 UTC

curator git commit: Added AsyncLocker

Repository: curator
Updated Branches:
  refs/heads/CURATOR-397 93f11ed09 -> e943763f0


Added AsyncLocker


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/e943763f
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/e943763f
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/e943763f

Branch: refs/heads/CURATOR-397
Commit: e943763f0d299b5e69e6a7f4e871024b7aa95503
Parents: 93f11ed
Author: randgalt <ra...@apache.org>
Authored: Sat Jun 10 20:46:19 2017 -0500
Committer: randgalt <ra...@apache.org>
Committed: Sat Jun 10 20:46:19 2017 -0500

----------------------------------------------------------------------
 .../org/apache/curator/x/async/AsyncLocker.java | 196 +++++++++++++++++++
 .../apache/curator/x/async/TestAsyncLocker.java |  76 +++++++
 2 files changed, 272 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/e943763f/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
new file mode 100644
index 0000000..8b65a4a
--- /dev/null
+++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncLocker.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.utils.ThreadUtils;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>
+ *     Utility for acquiring a lock asynchronously
+ * </p>
+ *
+ * <p>
+ *     Canonical usage:
+ * <code><pre>
+ *     InterProcessMutex mutex = new InterProcessMutex(...) // or any InterProcessLock
+ *     AsyncLocker.lockAsync(mutex).handle((state, e) -> {
+ *         if ( e != null )
+ *         {
+ *             // handle the error
+ *         }
+ *         else if ( state.hasTheLock() )
+ *         {
+ *             try
+ *             {
+ *                 // do work while holding the lock
+ *             }
+ *             finally
+ *             {
+ *                 state.release();
+ *             }
+ *         }
+ *     });
+ * </pre></code>
+ * </p>
+ */
+public class AsyncLocker
+{
+    /**
+     * State of the lock
+     */
+    public interface LockState
+    {
+        /**
+         * Returns true if you own the lock
+         *
+         * @return true/false
+         */
+        boolean hasTheLock();
+
+        /**
+         * Safe release of the lock. Only tries to release
+         * if you own the lock. The lock ownership is changed
+         * to <code>false</code> by this method.
+         */
+        void release();
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout and executor.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @param executor executor to use to asynchronously acquire
+     * @return stage
+     */
+    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit, Executor executor)
+    {
+        if ( executor == null )
+        {
+            return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit));
+        }
+        return CompletableFuture.supplyAsync(() -> lock(lock, timeout, unit), executor);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given executor and without a timeout.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param executor executor to use to asynchronously acquire
+     * @return stage
+     */
+    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, Executor executor)
+    {
+        return lockAsync(lock, 0, null, executor);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously using the given timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @param timeout max timeout to acquire lock
+     * @param unit time unit of timeout
+     * @return stage
+     */
+    public static CompletionStage<LockState> lockAsync(InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        return lockAsync(lock, timeout, unit, null);
+    }
+
+    /**
+     * Attempt to acquire the given lock asynchronously without timeout using the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
+     *
+     * @param lock a lock implementation (e.g. {@link org.apache.curator.framework.recipes.locks.InterProcessMutex},
+     * {@link org.apache.curator.framework.recipes.locks.InterProcessSemaphoreV2}, etc.)
+     * @return stage
+     */
+    public static CompletionStage<LockState> lockAsync(InterProcessLock lock)
+    {
+        return lockAsync(lock, 0, null, null);
+    }
+
+    private static LockState lock(InterProcessLock lock, long timeout, TimeUnit unit)
+    {
+        try
+        {
+            if ( unit != null )
+            {
+                boolean hasTheLock = lock.acquire(timeout, unit);
+                return new InternalLockState(lock, hasTheLock);
+            }
+
+            lock.acquire();
+            return new InternalLockState(lock, true);
+        }
+        catch ( Exception e )
+        {
+            ThreadUtils.checkInterrupted(e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private AsyncLocker()
+    {
+    }
+
+    private static class InternalLockState implements LockState
+    {
+        private final InterProcessLock lock;
+        private volatile boolean hasTheLock;
+
+        public InternalLockState(InterProcessLock lock, boolean hasTheLock)
+        {
+            this.lock = lock;
+            this.hasTheLock = hasTheLock;
+        }
+
+        @Override
+        public boolean hasTheLock()
+        {
+            return hasTheLock;
+        }
+
+        @Override
+        public void release()
+        {
+            if ( hasTheLock )
+            {
+                hasTheLock = false;
+                try
+                {
+                    lock.release();
+                }
+                catch ( Exception e )
+                {
+                    ThreadUtils.checkInterrupted(e);
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/e943763f/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
----------------------------------------------------------------------
diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
new file mode 100644
index 0000000..7ea2d08
--- /dev/null
+++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestAsyncLocker.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.recipes.locks.InterProcessMutex;
+import org.apache.curator.retry.RetryOneTime;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+public class TestAsyncLocker extends CompletableBaseClassForTests
+{
+    @Test
+    public void testBasic()
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            InterProcessMutex lock = new InterProcessMutex(client, "/one/two");
+            complete(AsyncLocker.lockAsync(lock), (state, e) -> {
+                Assert.assertNull(e);
+                Assert.assertTrue(state.hasTheLock());
+                state.release();
+            });
+        }
+    }
+
+    @Test
+    public void testContention() throws Exception
+    {
+        try ( CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)) )
+        {
+            client.start();
+
+            InterProcessMutex lock1 = new InterProcessMutex(client, "/one/two");
+            InterProcessMutex lock2 = new InterProcessMutex(client, "/one/two");
+            CountDownLatch latch = new CountDownLatch(1);
+            AsyncLocker.lockAsync(lock1).thenAccept(state -> {
+                if ( state.hasTheLock() )
+                {
+                    latch.countDown();  // don't release the lock
+                }
+            });
+            Assert.assertTrue(timing.awaitLatch(latch));
+
+            CountDownLatch latch2 = new CountDownLatch(1);
+            AsyncLocker.lockAsync(lock2, timing.forSleepingABit().milliseconds(), TimeUnit.MILLISECONDS).thenAccept(state -> {
+                if ( !state.hasTheLock() )
+                {
+                    latch2.countDown();  // lock should still be held
+                }
+            });
+            Assert.assertTrue(timing.awaitLatch(latch2));
+        }
+    }
+}