You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2021/03/17 21:30:14 UTC
[ignite] branch master updated: IGNITE-2399: Implement
acquireAndExecute In IgniteSemaphore (#8820)
This is an automated email from the ASF dual-hosted git repository.
vkulichenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 65b7039 IGNITE-2399: Implement acquireAndExecute In IgniteSemaphore (#8820)
65b7039 is described below
commit 65b7039947e335d742a0743a39388c5f80794d09
Author: Atri Sharma <at...@gmail.com>
AuthorDate: Thu Mar 18 02:59:50 2021 +0530
IGNITE-2399: Implement acquireAndExecute In IgniteSemaphore (#8820)
---
.../java/org/apache/ignite/IgniteSemaphore.java | 13 +++
.../datastructures/GridCacheSemaphoreImpl.java | 35 ++++++++
.../IgniteSemaphoreAbstractSelfTest.java | 95 ++++++++++++++++++++++
.../ignite/internal/util/IgniteUtilsSelfTest.java | 10 +++
4 files changed, 153 insertions(+)
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
index db748b4..d5ab7f2 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSemaphore.java
@@ -20,6 +20,9 @@ package org.apache.ignite;
import java.io.Closeable;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+
/**
* This interface provides a rich API for working with distributed semaphore.
* <p>
@@ -245,6 +248,16 @@ public interface IgniteSemaphore extends Closeable {
public void acquire(int permits) throws IgniteInterruptedException;
/**
+ * Acquires the given semaphore, executes the given callable and schedules the release of permits asynchronously
+ *
+ * @param callable the callable to execute
+ * @param numPermits the number of permits to acquire
+ * @throws Exception if the callable throws an exception
+ */
+ public <T> IgniteFuture<T> acquireAndExecute(IgniteCallable<T> callable,
+ int numPermits) throws Exception;
+
+ /**
* Releases the given number of permits, returning them to the semaphore.
*
* <p>Releases the given number of permits, increasing the number of available permits by that amount. If any
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
index de7d9f2..4f2985b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheSemaphoreImpl.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
@@ -40,11 +41,16 @@ import org.apache.ignite.internal.IgnitionEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import static org.apache.ignite.internal.processors.cache.GridCacheUtils.retryTopologySafe;
import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
@@ -663,6 +669,35 @@ public final class GridCacheSemaphoreImpl extends AtomicDataStructureProxy<GridC
}
/** {@inheritDoc} */
+ @Override public <T> IgniteFuture<T> acquireAndExecute(IgniteCallable<T> callable,
+ int numPermits) {
+ acquire(numPermits);
+
+ Future<T> passedInCallableFuture = ctx.kernalContext().getExecutorService().submit(callable);
+
+ final GridFutureAdapter<T> fut = new GridFutureAdapter<T>() {
+ @Override public T get() {
+ try {
+ return passedInCallableFuture.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ };
+
+ IgniteFuture<T> future = new IgniteFutureImpl<>(fut);
+
+ future.listen(new IgniteInClosure<IgniteFuture<T>>() {
+ /** {@inheritDoc} */
+ @Override public void apply(IgniteFuture<T> igniteFuture) {
+ release(numPermits);
+ }
+ });
+
+ return future;
+ }
+
+ /** {@inheritDoc} */
@Override public int availablePermits() {
ctx.kernalContext().gateway().readLock();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
index 5d8fc06..3b95a8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/datastructures/IgniteSemaphoreAbstractSelfTest.java
@@ -25,6 +25,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
@@ -35,6 +38,8 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.future.IgniteFutureImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicateX;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.PA;
@@ -304,6 +309,96 @@ public abstract class IgniteSemaphoreAbstractSelfTest extends IgniteAtomicsAbstr
}
/**
+ * Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAcquireAndExecute() throws Exception {
+ IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecute", 1, true, true);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
+ @Override public Integer call() {
+ assert (semaphore.availablePermits() == 0);
+
+ return 5;
+ }
+ };
+
+ IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
+
+ Runnable runnable = new Runnable() {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ IgniteFutureImpl impl = (IgniteFutureImpl<Integer>) igniteFuture;
+
+ GridFutureAdapter fut = (GridFutureAdapter) (impl.internalFuture());
+ fut.onDone(true);
+ }
+ };
+
+ executorService.submit(runnable);
+
+ Thread.sleep(1000);
+ igniteFuture.get(7000, MILLISECONDS);
+
+ assertTrue(igniteFuture.isDone());
+
+ assertTrue(semaphore.availablePermits() == 1);
+
+ executorService.shutdown();
+ }
+
+ /**
+ * Test to verify the {@link IgniteSemaphore#acquireAndExecute(IgniteCallable, int)}'s behaviour in case of a failure.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testAcquireAndExecuteIfFailure() {
+ IgniteSemaphore semaphore = ignite(0).semaphore("testAcquireAndExecuteIfFailure", 1, true, true);
+ ExecutorService executorService = Executors.newSingleThreadExecutor();
+
+ IgniteCallable<Integer> callable = new IgniteCallable<Integer>() {
+ @Override public Integer call() {
+ throw new RuntimeException("Foobar");
+ }
+ };
+
+ GridTestUtils.assertThrows(log, new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ IgniteFuture igniteFuture = semaphore.acquireAndExecute(callable, 1);
+
+ Runnable runnable = new Runnable() {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ try {
+ Thread.sleep(1000);
+ IgniteFutureImpl impl = (IgniteFutureImpl<Integer>) igniteFuture;
+
+ GridFutureAdapter fut = (GridFutureAdapter) (impl.internalFuture());
+ fut.onDone(true);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e.getMessage());
+ }
+ }
+ };
+
+ executorService.submit(runnable);
+
+ ((IgniteFutureImpl)igniteFuture).internalFuture().get();
+
+ assertTrue(igniteFuture.isDone());
+
+ return null;
+ }
+ }, RuntimeException.class, "Foobar");
+
+ executorService.shutdown();
+ }
+
+ /**
* @throws Exception If failed.
*/
private void checkSemaphoreSerialization() throws Exception {
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
index 3f1cf07..0c06fc3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsSelfTest.java
@@ -119,6 +119,16 @@ public class IgniteUtilsSelfTest extends GridCommonAbstractTest {
return new String(chs);
}
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrids(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
/**
*
*/