You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2021/03/17 21:30:14 UTC

[ignite] branch master updated: IGNITE-2399: Implement acquireAndExecute In IgniteSemaphore (#8820)

This is an automated email from the ASF dual-hosted git repository.

vkulichenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 65b7039  IGNITE-2399: Implement acquireAndExecute In IgniteSemaphore (#8820)
65b7039 is described below

commit 65b7039947e335d742a0743a39388c5f80794d09
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Thu Mar 18 02:59:50 2021 +0530

    IGNITE-2399: Implement acquireAndExecute In IgniteSemaphore (#8820)
---
 .../java/org/apache/ignite/IgniteSemaphore.java    | 13 +++
 .../datastructures/GridCacheSemaphoreImpl.java     | 35 ++++++++
 .../IgniteSemaphoreAbstractSelfTest.java           | 95 ++++++++++++++++++++++
 .../ignite/internal/util/IgniteUtilsSelfTest.java  | 10 +++
 4 files changed, 153 insertions(+)

diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index db748b4..d5ab7f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -20,6 +20,9 @@ package org.apache.ignite;
 import java.io.Closeable;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+
 /**
  * This interface provides a rich API for working with distributed semaphore.
  * <p>
@@ -245,6 +248,16 @@ public interface IgniteSemaphore extends Closeable {
     public void acquire(int permits) throws IgniteInterruptedException;
 
     /**
+     * Acquires the given semaphore, executes the given callable and schedules the release of permits asynchronously
+     *
+     * @param callable the callable to execute
+     * @param numPermits the number of permits to acquire
+     * @throws Exception if the callable throws an exception
+     */
+    public <T> IgniteFuture<T> acquireAndExecute(IgniteCallable<T> callable,
+                                                 int numPermits) throws Exception;
+
+    /**
      * Releases the given number of permits, returning them to the semaphore.
      *
      * <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index de7d9f2..4f2985b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@@ -40,11 +41,16 @@ import org.apache.ignite.internal.IgnitionEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 
 import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
 import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -663,6 +669,35 @@ public final class GridCacheSemaphoreImpl extends AtomicDataStructureProxy<GridC
     }
 
     /** {@inheritDoc} */
+    @Override public <T> IgniteFuture<T> acquireAndExecute(IgniteCallable<T> callable,
+                                                           int numPermits) {
+        acquire(numPermits);
+
+        Future<T> passedInCallableFuture = ctx.kernalContext().getExecutorService().submit(callable);
+
+        final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
+            @Override public T get() {
+                try {
+                    return passedInCallableFuture.get();
+                } catch (Exception e) {
+                    throw new RuntimeException(e.getMessage());
+                }
+            }
+        };
+
+        IgniteFuture<T> future = new IgniteFutureImpl<>(fut);
+
+        future.listen(new IgniteInClosure<IgniteFuture<T>>() {
+            /** {@inheritDoc} */
+            @Override public void apply(IgniteFuture<T> igniteFuture) {
+                release(numPermits);
+            }
+        });
+
+        return future;
+    }
+
+    /** {@inheritDoc} */
     @Override public int availablePermits() {
         ctx.kernalContext().gateway().readLock();
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index 5d8fc06..3b95a8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -25,6 +25,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
@@ -35,6 +38,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
 import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
 import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.PA;
@@ -304,6 +309,96 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
     }
 
     /**
+     * Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAcquireAndExecute() throws Exception {
+        IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecute", 1, true, true);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
+            @Override public Integer call() {
+                assert (semaphore.availablePermits() == 0);
+
+                return 5;
+            }
+        };
+
+        IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
+
+        Runnable runnable = new Runnable() {
+            /** {@inheritDoc} */
+            @Override public void run() {
+                IgniteFutureImpl impl = (IgniteFutureImpl<Integer>) igniteFuture;
+
+                GridFutureAdapter fut = (GridFutureAdapter) (impl.internalFuture());
+                fut.onDone(true);
+            }
+        };
+
+        executorService.submit(runnable);
+
+        Thread.sleep(1000);
+        igniteFuture.get(7000, MILLISECONDS);
+
+        assertTrue(igniteFuture.isDone());
+
+        assertTrue(semaphore.availablePermits() == 1);
+
+        executorService.shutdown();
+    }
+
+    /**
+     * Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}'s behaviour in case of a failure.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testAcquireAndExecuteIfFailure() {
+        IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecuteIfFailure", 1, true, true);
+        ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+        IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
+            @Override public Integer call() {
+                throw new RuntimeException("Foobar");
+            }
+        };
+
+        GridTestUtils.assertThrows(log, new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
+
+                Runnable runnable = new Runnable() {
+                    /** {@inheritDoc} */
+                    @Override public void run() {
+                        try {
+                            Thread.sleep(1000);
+                            IgniteFutureImpl impl = (IgniteFutureImpl<Integer>) igniteFuture;
+
+                            GridFutureAdapter fut = (GridFutureAdapter) (impl.internalFuture());
+                            fut.onDone(true);
+                        } catch (InterruptedException e) {
+                            throw new RuntimeException(e.getMessage());
+                        }
+                    }
+                };
+
+                executorService.submit(runnable);
+
+                ((IgniteFutureImpl)igniteFuture).internalFuture().get();
+
+                assertTrue(igniteFuture.isDone());
+
+                return null;
+            }
+        }, RuntimeException.class, "Foobar");
+
+        executorService.shutdown();
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void checkSemaphoreSerialization() throws Exception {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 3f1cf07..0c06fc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -119,6 +119,16 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
         return new String(chs);
     }
 
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrids(2);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
     /**
      *
      */