You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/04/16 15:53:30 UTC
[ignite] branch master updated: IGNITE-12033 Move async
continuations away from striped pool
This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 38d279a IGNITE-12033 Move async continuations away from striped pool
38d279a is described below
commit 38d279afd18f21e6d26a6f3730999600372e039f
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Apr 16 18:53:08 2021 +0300
IGNITE-12033 Move async continuations away from striped pool
User continuations should not run on striped pool - this can cause unexpected, hard to diagnose performance issues and deadlocks.
* Add `IgniteConfiguration#asyncContinuationExecutor`. Defaults to `null`, which means `ForkJoinPool#commonPool` should be used.
* Use default executor for Cache async operations
* Add .NET and Java tests
IEP: https://cwiki.apache.org/confluence/display/IGNITE/IEP-70%3A+Async+Continuation+Executor
---
.../jmh/cache/JmhCacheAsyncListenBenchmark.java | 162 +++++++++++++
.../ignite/configuration/IgniteConfiguration.java | 37 +++
.../apache/ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 10 +
.../processors/cache/IgniteCacheFutureImpl.java | 8 +-
.../processors/cache/IgniteCacheProxyImpl.java | 26 ++-
.../cache/IgniteFinishedCacheFutureImpl.java | 2 +-
.../wal/reader/StandaloneGridKernalContext.java | 6 +
.../processors/datastreamer/DataStreamerImpl.java | 4 +-
.../platform/utils/PlatformConfigurationUtils.java | 40 ++++
.../internal/util/future/IgniteFutureImpl.java | 24 +-
.../cache/CacheAsyncContinuationExecutorTest.java | 254 +++++++++++++++++++++
...eAsyncContinuationSynchronousExecutorTest.java} | 26 ++-
.../util/future/IgniteCacheFutureImplTest.java | 2 +-
.../ignite/platform/PlatformTestExecutor.java} | 16 +-
.../ignite/platform/PlatformThreadUtils.java | 9 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 4 +
.../Cache/CacheTestAsyncAwait.cs | 115 ++++++++++
.../Client/Cache/CacheTestAsyncAwait.cs | 2 +-
.../Compute/CancellationTest.cs | 2 +-
.../Compute/ComputeApiTest.cs | 6 +-
.../ComputeTestAsyncAwait.cs} | 29 ++-
.../Config/full-config.xml | 2 +-
.../Config/spring-test.xml | 4 +
.../IgniteConfigurationSerializerTest.cs | 5 +-
.../IgniteConfigurationTest.cs | 2 +
.../Apache.Ignite.Core.Tests/ProjectFilesTest.cs | 16 +-
.../Services/PlatformTestService.cs | 1 +
.../Apache.Ignite.Core.Tests/TestUtilsJni.cs | 21 ++
.../Apache.Ignite.Core/Apache.Ignite.Core.csproj | 1 +
.../Configuration/AsyncContinuationExecutor.cs | 60 +++++
.../Apache.Ignite.Core/IgniteConfiguration.cs | 17 +-
.../IgniteConfigurationSection.xsd | 12 +
.../Impl/Compute/ComputeTaskHolder.cs | 20 +-
.../Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs | 2 +-
35 files changed, 882 insertions(+), 73 deletions(-)
diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java
new file mode 100644
index 0000000..d494d39
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.cache;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.benchmarks.model.IntValue;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Mode;
+
+
+/**
+ * Cache async listen benchmark.
+ * Measures {@link IgniteFuture#listen(IgniteInClosure)} performance.
+ *
+ * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275:
+ *
+ * Without ForkJoinPool async continuation executor:
+ * Benchmark Mode Cnt Score Error Units
+ * JmhCacheAsyncListenBenchmark.get thrpt 10 82052.664 ± 2891.182 ops/s
+ * JmhCacheAsyncListenBenchmark.put thrpt 10 77859.584 ± 2071.196 ops/s
+ *
+ * With ForkJoinPool async continuation executor:
+ * Benchmark Mode Cnt Score Error Units
+ * JmhCacheAsyncListenBenchmark.get thrpt 10 76008.272 ± 1506.928 ops/s
+ * JmhCacheAsyncListenBenchmark.put thrpt 10 73393.986 ± 1336.420 ops/s
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class JmhCacheAsyncListenBenchmark extends JmhCacheAbstractBenchmark {
+ /** {@inheritDoc} */
+ @Override public void setup() throws Exception {
+ super.setup();
+
+ IgniteDataStreamer<Integer, IntValue> dataLdr = node.dataStreamer(cache.getName());
+
+ for (int i = 0; i < CNT; i++)
+ dataLdr.addData(i, new IntValue(i));
+
+ dataLdr.close();
+
+ System.out.println("Cache populated.");
+ }
+
+ /**
+ * Test PUT operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void put() throws Exception {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ blockingListen(cache.putAsync(key, new IntValue(key)));
+ }
+
+ /**
+ * Test GET operation.
+ *
+ * @throws Exception If failed.
+ */
+ @Benchmark
+ public void get() throws Exception {
+ int key = ThreadLocalRandom.current().nextInt(CNT);
+
+ blockingListen(cache.getAsync(key));
+ }
+
+ /**
+ * Blocks until future completion using {@link IgniteFuture#listen(IgniteInClosure)}.
+ *
+ * @param future Future
+ */
+ private static void blockingListen(IgniteFuture future) throws Exception {
+ AtomicBoolean ab = new AtomicBoolean();
+
+ future.listen(f -> {
+ try {
+ synchronized (ab) {
+ ab.set(true);
+ ab.notifyAll();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ });
+
+ synchronized (ab) {
+ while (!ab.get()) {
+ ab.wait(5000);
+ }
+ }
+ }
+
+ /**
+ * Run benchmarks.
+ *
+ * @param args Arguments.
+ * @throws Exception If failed.
+ */
+ public static void main(String[] args) throws Exception {
+ run(CacheAtomicityMode.ATOMIC);
+ }
+
+ /**
+ * Run benchmarks for atomic cache.
+ *
+ * @param atomicityMode Atomicity mode.
+ * @throws Exception If failed.
+ */
+ private static void run(CacheAtomicityMode atomicityMode) throws Exception {
+ run(4, true, atomicityMode, CacheWriteSynchronizationMode.PRIMARY_SYNC);
+ }
+
+ /**
+ * Run benchmark.
+ *
+ * @param threads Amount of threads.
+ * @param client Client mode flag.
+ * @param atomicityMode Atomicity mode.
+ * @param writeSyncMode Write synchronization mode.
+ * @throws Exception If failed.
+ */
+ private static void run(int threads, boolean client, CacheAtomicityMode atomicityMode,
+ CacheWriteSynchronizationMode writeSyncMode) throws Exception {
+
+ JmhIdeBenchmarkRunner.create()
+ .forks(1)
+ .threads(threads)
+ .benchmarks(JmhCacheAsyncListenBenchmark.class.getSimpleName())
+ .jvmArguments(
+ "-Xms4g",
+ "-Xmx4g",
+ JmhIdeBenchmarkRunner.createProperty(PROP_ATOMICITY_MODE, atomicityMode),
+ JmhIdeBenchmarkRunner.createProperty(PROP_WRITE_SYNC_MODE, writeSyncMode),
+ JmhIdeBenchmarkRunner.createProperty(PROP_DATA_NODES, 2),
+ JmhIdeBenchmarkRunner.createProperty(PROP_CLIENT_MODE, client))
+ .benchmarkModes(Mode.Throughput)
+ .run();
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 3801795..aa3109d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
import java.lang.management.ManagementFactory;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
import java.util.zip.Deflater;
import javax.cache.configuration.Factory;
import javax.cache.event.CacheEntryListener;
@@ -607,6 +609,9 @@ public class IgniteConfiguration {
/** SQL configuration. */
private SqlConfiguration sqlCfg = new SqlConfiguration();
+ /** Executor for async operations continuations. */
+ private Executor asyncContinuationExecutor;
+
/** Shutdown policy for cluster. */
public ShutdownPolicy shutdown = DFLT_SHUTDOWN_POLICY;
@@ -738,6 +743,7 @@ public class IgniteConfiguration {
warmupClos = cfg.getWarmupClosure();
sqlCfg = cfg.getSqlConfiguration();
shutdown = cfg.getShutdownPolicy();
+ asyncContinuationExecutor = cfg.getAsyncContinuationExecutor();
}
/**
@@ -3641,6 +3647,37 @@ public class IgniteConfiguration {
return this;
}
+ /**
+ * Gets the continuation executor for cache async APIs.
+ * <p />
+ * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+ * <p />
+ * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners
+ * will be invoked using this executor.
+ *
+ * @return Executor for async continuations.
+ */
+ public Executor getAsyncContinuationExecutor() {
+ return asyncContinuationExecutor;
+ }
+
+ /**
+ * Sets the continuation executor for cache async APIs.
+ * <p />
+ * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+ * <p />
+ * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners
+ * will be invoked using this executor.
+ *
+ * @param asyncContinuationExecutor Executor for async continuations.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setAsyncContinuationExecutor(Executor asyncContinuationExecutor) {
+ this.asyncContinuationExecutor = asyncContinuationExecutor;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 281b8a0..e0e51fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.configuration.IgniteConfiguration;
@@ -784,4 +785,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
* @return Performance statistics processor.
*/
public PerformanceStatisticsProcessor performanceStatistics();
+
+ /**
+ * Executor that is in charge of processing user async continuations.
+ *
+ * @return Executor that is in charge of processing user async continuations.
+ */
+ public Executor getAsyncContinuationExecutor();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index bb8fcf8..25f4ed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -30,7 +30,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -1333,4 +1336,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
@Override public PerformanceStatisticsProcessor performanceStatistics() {
return perfStatProc;
}
+
+ /** {@inheritDoc} */
+ @Override public Executor getAsyncContinuationExecutor() {
+ return config().getAsyncContinuationExecutor() == null
+ ? ForkJoinPool.commonPool()
+ : config().getAsyncContinuationExecutor();
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 6f6223f..c2dca1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -38,18 +38,18 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
*
* @param fut Internal future.
*/
- public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut) {
- super(fut);
+ public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut, Executor defaultExecutor) {
+ super(fut, defaultExecutor);
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
- return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null));
+ return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null), defaultExecutor);
}
/** {@inheritDoc} */
@Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec) {
- return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec));
+ return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 6222f60..c481450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
@@ -1906,7 +1907,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
@Override public IgniteFuture<?> destroyAsync() {
GridCacheContext<K, V> ctx = getContextSafe();
- return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null));
+ return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null), exec());
}
/** {@inheritDoc} */
@@ -1918,7 +1919,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
@Override public IgniteFuture<?> closeAsync() {
GridCacheContext<K, V> ctx = getContextSafe();
- return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName));
+ return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName), exec());
}
/** {@inheritDoc} */
@@ -2064,7 +2065,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
assert restartFut != null;
- throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), cacheName);
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), cacheName);
}
else
throw restartingException;
@@ -2072,7 +2073,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
if (restartFut != null) {
if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class))
- throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " +
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), "Cache is restarting: " +
cacheName, e);
}
@@ -2100,7 +2101,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
/** {@inheritDoc} */
@Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
- return new IgniteCacheFutureImpl<>(fut);
+ return new IgniteCacheFutureImpl<>(fut, exec());
}
/**
@@ -2216,7 +2217,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
@Override public IgniteFuture<Boolean> rebalance() {
GridCacheContext<K, V> ctx = getContextSafe();
- return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
+ return new IgniteFutureImpl<>(ctx.preloader().forceRebalance(), exec());
}
/** {@inheritDoc} */
@@ -2228,7 +2229,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
if (fut == null)
return new IgniteFinishedFutureImpl<>();
- return new IgniteFutureImpl<>(fut);
+ return new IgniteFutureImpl<>(fut, exec());
}
/**
@@ -2256,7 +2257,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
//do nothing
}
- throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), cacheName);
+ throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut, exec()), cacheName);
}
}
@@ -2363,6 +2364,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
}
/**
+ * Async continuation executor.
+ */
+ private Executor exec() {
+ return context().kernalContext().getAsyncContinuationExecutor();
+ }
+
+ /**
*
*/
private class RestartFuture extends GridFutureAdapter<Void> {
@@ -2395,7 +2403,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
}
throw new IgniteCacheRestartingException(
- new IgniteFutureImpl<>(this),
+ new IgniteFutureImpl<>(this, exec()),
"Cache is restarting: " + name
);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
index f7bc95b..f515811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
@@ -27,6 +27,6 @@ public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
* @param err Error.
*/
public IgniteFinishedCacheFutureImpl(Throwable err) {
- super(new GridFinishedFuture<V>(err));
+ super(new GridFinishedFuture<V>(err), null);
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 54ae4ab..912876b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -746,4 +747,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
@Override public PerformanceStatisticsProcessor performanceStatistics() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public Executor getAsyncContinuationExecutor() {
+ return null;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9b1f624..452789d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -370,7 +370,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
fut = new DataStreamerFuture(this);
- publicFut = new IgniteCacheFutureImpl<>(fut);
+ publicFut = new IgniteCacheFutureImpl<>(fut, ctx.getAsyncContinuationExecutor());
GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
@@ -708,7 +708,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
@NotNull protected IgniteCacheFutureImpl createDataLoadFuture() {
GridFutureAdapter internalFut0 = new GridFutureAdapter();
- IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0);
+ IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0, ctx.getAsyncContinuationExecutor());
internalFut0.listen(rmvActiveFut);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index fbe3218..5bead46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
+import java.util.concurrent.Executor;
import javax.cache.configuration.Factory;
import javax.cache.expiry.ExpiryPolicy;
import javax.net.ssl.SSLContext;
@@ -122,6 +123,9 @@ import org.apache.ignite.util.AttributeNodeFilter;
*/
@SuppressWarnings({"unchecked", "TypeMayBeWeakened"})
public class PlatformConfigurationUtils {
+ /** */
+ private static final Executor synchronousExecutor = Runnable::run;
+
/**
* Write .Net configuration to the stream.
*
@@ -760,6 +764,8 @@ public class PlatformConfigurationUtils {
cfg.setSqlQueryHistorySize(in.readInt());
if (in.readBoolean())
cfg.setPeerClassLoadingEnabled(in.readBoolean());
+ if (in.readBoolean())
+ cfg.setAsyncContinuationExecutor(getAsyncContinuationExecutor(in.readInt()));
int sqlSchemasCnt = in.readInt();
@@ -1366,6 +1372,8 @@ public class PlatformConfigurationUtils {
w.writeInt(cfg.getSqlQueryHistorySize());
w.writeBoolean(true);
w.writeBoolean(cfg.isPeerClassLoadingEnabled());
+ w.writeBoolean(true);
+ w.writeInt(getAsyncContinuationExecutorMode(cfg.getAsyncContinuationExecutor()));
if (cfg.getSqlSchemas() == null)
w.writeInt(0);
@@ -2335,6 +2343,38 @@ public class PlatformConfigurationUtils {
}
/**
+ * Gets the executor.
+ *
+ * @param mode Mode.
+ * @return Executor.
+ */
+ private static Executor getAsyncContinuationExecutor(int mode) {
+ switch (mode) {
+ case 0: return null;
+ case 1: return synchronousExecutor;
+ default: throw new IgniteException("Invalid AsyncContinuationExecutor mode: " + mode);
+ }
+ }
+
+ /**
+ * Gets the executor mode.
+ *
+ * @param executor Executor.
+ * @return Mode.
+ */
+ private static int getAsyncContinuationExecutorMode(Executor executor) {
+ if (executor == null) {
+ return 0;
+ }
+
+ if (executor.equals(synchronousExecutor)) {
+ return 1;
+ }
+
+ return 2;
+ }
+
+ /**
* Private constructor.
*/
private PlatformConfigurationUtils() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
index ab8aa7d..307a28f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
@@ -38,13 +38,25 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
/** */
protected final IgniteInternalFuture<V> fut;
+ /** */
+ protected final Executor defaultExecutor;
+
/**
* @param fut Future.
*/
public IgniteFutureImpl(IgniteInternalFuture<V> fut) {
+ this(fut, null);
+ }
+
+ /**
+ * @param fut Future.
+ * @param defaultExecutor Default executor.
+ */
+ public IgniteFutureImpl(IgniteInternalFuture<V> fut, @Nullable Executor defaultExecutor) {
assert fut != null;
this.fut = fut;
+ this.defaultExecutor = defaultExecutor;
}
/**
@@ -68,7 +80,10 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
@Override public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr) {
A.notNull(lsnr, "lsnr");
- fut.listen(new InternalFutureListener(lsnr));
+ if (defaultExecutor != null && !isDone())
+ fut.listen(new InternalFutureListener(new AsyncFutureListener<>(lsnr, defaultExecutor)));
+ else
+ fut.listen(new InternalFutureListener(lsnr));
}
/** {@inheritDoc} */
@@ -81,7 +96,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
/** {@inheritDoc} */
@Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
- return new IgniteFutureImpl<>(chainInternal(doneCb, null));
+ return new IgniteFutureImpl<>(chainInternal(doneCb, null), defaultExecutor);
}
/** {@inheritDoc} */
@@ -90,7 +105,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
A.notNull(doneCb, "doneCb");
A.notNull(exec, "exec");
- return new IgniteFutureImpl<>(chainInternal(doneCb, exec));
+ return new IgniteFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor);
}
/**
@@ -115,6 +130,9 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
if (exec != null)
return fut.chain(clos, exec);
+ if (defaultExecutor != null && !isDone())
+ return fut.chain(clos, defaultExecutor);
+
return fut.chain(clos);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java
new file mode 100644
index 0000000..48bac86
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.junit.Test;
+
+/**
+ * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)}.
+ */
+@SuppressWarnings("rawtypes")
+public class CacheAsyncContinuationExecutorTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheAtomicityMode atomicityMode() {
+ return CacheAtomicityMode.ATOMIC;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int backups() {
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+ CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
+
+ // Use cache store with a write delay to make sure future does not complete before we register a listener.
+ ccfg.setCacheStoreFactory(new DelayedStoreFactory());
+ ccfg.setReadThrough(true);
+ ccfg.setWriteThrough(true);
+
+ return ccfg;
+ }
+
+ /**
+ * Gets the expected thread name prefix.
+ * @return Prefix.
+ */
+ protected String expectedThreadNamePrefix() {
+ return "ForkJoinPool.commonPool-worker";
+ }
+
+ /**
+ * Gets a value indicating whether continuation thread can execute cache operations.
+ * @return Value indicating whether continuation thread can execute cache operations.
+ */
+ protected boolean allowCacheOperationsInContinuation() {
+ return true;
+ }
+
+ /**
+ * Tests future listen with default executor.
+ */
+ @Test
+ public void testRemoteOperationListenContinuesOnDefaultExecutor() throws Exception {
+ testRemoteOperationContinuesOnDefaultExecutor(false);
+ }
+
+ /**
+ * Tests future chain with default executor.
+ */
+ @Test
+ public void testRemoteOperationChainContinuesOnDefaultExecutor() throws Exception {
+ testRemoteOperationContinuesOnDefaultExecutor(true);
+ }
+
+ /**
+ * Tests that an operation on a local key executes synchronously, and listener is called immediately on the current
+ * thread.
+ */
+ @Test
+ public void testLocalOperationListenerExecutesSynchronously() {
+ final String key = getPrimaryKey(0);
+
+ IgniteCache<String, Integer> cache = jcache(0);
+ AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+ cache.putAsync(key, 1).listen(f -> listenThreadName.set(Thread.currentThread().getName()));
+
+ assertEquals(Thread.currentThread().getName(), listenThreadName.get());
+ }
+
+ /**
+ * Tests that an operation on a remote key executes on striped pool directly when a syncronous executor is provided.
+ * This demonstrates that default safe behavior can be overridden with a faster, but unsafe old behavior
+ * for an individual operation.
+ */
+ @Test
+ public void testRemoteOperationListenerExecutesOnStripedPoolWhenCustomExecutorIsProvided() throws Exception {
+ final String key = getPrimaryKey(1);
+
+ IgniteCache<String, Integer> cache = jcache(0);
+ AtomicReference<String> listenThreadName = new AtomicReference<>("");
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ cache.putAsync(key, 1).listenAsync(f -> {
+ listenThreadName.set(Thread.currentThread().getName());
+
+ try {
+ barrier.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }, Runnable::run);
+
+ barrier.await(10, TimeUnit.SECONDS);
+
+ assertTrue(listenThreadName.get(), listenThreadName.get().startsWith("sys-stripe-"));
+ }
+
+ /**
+ * Tests that an operation on a local key executes synchronously, and chain is called immediately on the current
+ * thread.
+ */
+ @Test
+ public void testLocalOperationChainExecutesSynchronously() {
+ final String key = getPrimaryKey(0);
+
+ IgniteCache<String, Integer> cache = jcache(0);
+ AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+ cache.putAsync(key, 1).chain(f -> {
+ listenThreadName.set(Thread.currentThread().getName());
+
+ return new IgniteFinishedFutureImpl<>();
+ });
+
+ assertEquals(Thread.currentThread().getName(), listenThreadName.get());
+ }
+
+ /**
+ * Tests future chain / listen with default executor.
+ *
+ * This test would hang before {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)}
+ * was introduced, or if we set {@link Runnable#run()} as the executor.
+ */
+ private void testRemoteOperationContinuesOnDefaultExecutor(boolean chain) throws Exception {
+ final String key = getPrimaryKey(1);
+
+ IgniteCache<String, Integer> cache = jcache(0);
+ CyclicBarrier barrier = new CyclicBarrier(2);
+ AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+ IgniteInClosure<IgniteFuture<Void>> clos = f -> {
+ listenThreadName.set(Thread.currentThread().getName());
+
+ if (allowCacheOperationsInContinuation()) {
+ // Check that cache operations do not deadlock.
+ cache.replace(key, 2);
+ }
+
+ try {
+ barrier.await(10, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ };
+
+ IgniteFuture<Void> fut = cache.putAsync(key, 1);
+
+ if (chain)
+ fut.chain(f -> {
+ clos.apply(f);
+ return new IgniteFinishedFutureImpl<>();
+ });
+ else
+ fut.listen(clos);
+
+ barrier.await(10, TimeUnit.SECONDS);
+
+ assertEquals(allowCacheOperationsInContinuation() ? 2 : 1, cache.get(key).intValue());
+ assertTrue(listenThreadName.get(), listenThreadName.get().startsWith(expectedThreadNamePrefix()));
+ }
+
+ /**
+ * Gets the primary key.
+ * @param nodeIdx Node index.
+ * @return Key.
+ */
+ @SuppressWarnings("OptionalGetWithoutIsPresent")
+ private String getPrimaryKey(int nodeIdx) {
+ return IntStream.range(0, 1000)
+ .mapToObj(String::valueOf)
+ .filter(x -> belongs(x, nodeIdx))
+ .findFirst()
+ .get();
+ }
+
+ /** */
+ private static class DelayedStoreFactory implements Factory<CacheStore> {
+ /** {@inheritDoc} */
+ @Override public CacheStore create() {
+ return new CacheStoreAdapter() {
+ /** {@inheritDoc} */
+ @Override public Object load(Object key) throws CacheLoaderException {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(Cache.Entry entry) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void delete(Object key) {
+ // No-op.
+ }
+ };
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
copy to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
index f7bc95b..0651f6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
@@ -17,16 +17,26 @@
package org.apache.ignite.internal.processors.cache;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
/**
- *
+ * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)} with synchronous executor (old behavior).
*/
-public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
- /**
- * @param err Error.
- */
- public IgniteFinishedCacheFutureImpl(Throwable err) {
- super(new GridFinishedFuture<V>(err));
+public class CacheAsyncContinuationSynchronousExecutorTest extends CacheAsyncContinuationExecutorTest {
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName).setAsyncContinuationExecutor(Runnable::run);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected String expectedThreadNamePrefix() {
+ return "sys-stripe-";
+ }
+
+ /** {@inheritDoc} */
+ @Override protected boolean allowCacheOperationsInContinuation() {
+ return false;
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
index 46f1706..b74a7fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
public class IgniteCacheFutureImplTest extends IgniteFutureImplTest {
/** {@inheritDoc} */
@Override protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) {
- return new IgniteCacheFutureImpl<>(fut);
+ return new IgniteCacheFutureImpl<>(fut, Runnable::run);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
similarity index 70%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
copy to modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
index f7bc95b..4bf26c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
@@ -15,18 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.processors.cache;
+package org.apache.ignite.platform;
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import java.util.concurrent.Executor;
/**
- *
+ * Test executor.
*/
-public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
- /**
- * @param err Error.
- */
- public IgniteFinishedCacheFutureImpl(Throwable err) {
- super(new GridFinishedFuture<V>(err));
+public class PlatformTestExecutor implements Executor {
+ /** {@inheritDoc} */
+ @Override public void execute(Runnable runnable) {
+ runnable.run();
}
}
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
index 0029d1e..b60ebeb 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
@@ -48,4 +48,13 @@ public class PlatformThreadUtils {
}
}
}
+
+ /**
+ * Gets the thread name.
+ *
+ * @return Thread name.
+ */
+ public static String getThreadName() {
+ return Thread.currentThread().getName();
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 31bc4d9..719232c 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConf
import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
+import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationExecutorTest;
+import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationSynchronousExecutorTest;
import org.apache.ignite.internal.processors.cache.CacheAtomicSingleMessageCountSelfTest;
import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteQueueTest;
import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
@@ -396,6 +398,8 @@ public class IgniteCacheTestSuite {
GridTestUtils.addTestIfNeeded(suite, InterceptorWithKeepBinaryCacheFullApiTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, BinaryMetadataRegistrationInsideEntryProcessorTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationExecutorTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationSynchronousExecutorTest.class, ignoredTests);
suite.add(IgniteGetNonPlainKeyReadThroughSelfTest.class);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs
new file mode 100644
index 0000000..f27600e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable MethodHasAsyncOverload
+namespace Apache.Ignite.Core.Tests.Cache
+{
+ using System.Threading;
+ using System.Threading.Tasks;
+ using Apache.Ignite.Core.Common;
+ using Apache.Ignite.Core.Configuration;
+ using NUnit.Framework;
+
+ /// <summary>
+ /// Tests thick cache operations with async/await.
+ /// </summary>
+ public class CacheTestAsyncAwait : TestBase
+ {
+ /// <summary>
+ /// Initializes a new instance of <see cref="CacheTestAsyncAwait"/> class.
+ /// </summary>
+ public CacheTestAsyncAwait() : base(2)
+ {
+ // No-op.
+ }
+
+ /// <summary>
+ /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread.
+ /// </summary>
+ [Test]
+ public async Task TestAsyncAwaitContinuationIsExecutedWithConfiguredExecutor()
+ {
+ var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName);
+ var key = TestUtils.GetPrimaryKey(Ignite2, cache.Name);
+
+ // This causes deadlock if async continuation is executed on the striped thread.
+ await cache.PutAsync(key, 1);
+ cache.Replace(key, 2);
+
+ Assert.AreEqual(2, cache.Get(key));
+ StringAssert.DoesNotContain("sys-stripe-", TestUtilsJni.GetJavaThreadName());
+ }
+
+ /// <summary>
+ /// Tests that local operation executes synchronously and completes on the same thread.
+ /// </summary>
+ [Test]
+ public async Task TestLocalOperationExecutesSynchronously()
+ {
+ var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName);
+ var key = TestUtils.GetPrimaryKey(Ignite, cache.Name);
+ var origThread = Thread.CurrentThread;
+
+ await cache.PutAsync(key, key);
+
+ Assert.AreEqual(origThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId);
+ }
+
+ /// <summary>
+ /// Tests that explicitly configured synchronous executor runs continuations on the striped pool.
+ /// </summary>
+ [Test]
+ public async Task TestSynchronousExecutorRunsContinuationsOnStripedPool()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration(name: "client"))
+ {
+ AsyncContinuationExecutor = AsyncContinuationExecutor.UnsafeSynchronous,
+ ClientMode = true
+ };
+
+ using (var client = Ignition.Start(cfg))
+ {
+ var cache = client.GetOrCreateCache<int, int>(TestUtils.TestName);
+
+ await cache.PutAsync(1, 1);
+
+ StringAssert.StartsWith("sys-stripe-", TestUtilsJni.GetJavaThreadName());
+
+ Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous,
+ client.GetConfiguration().AsyncContinuationExecutor);
+
+ // Jump away from striped pool to avoid deadlock on node stop.
+ await Task.Yield();
+ }
+ }
+
+ /// <summary>
+ /// Tests that invalid executor configuration is rejected.
+ /// </summary>
+ [Test]
+ public void TestInvalidExecutorConfigurationFailsOnStart()
+ {
+ var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+ {
+ AsyncContinuationExecutor = AsyncContinuationExecutor.Custom
+ };
+
+ var ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg));
+ Assert.AreEqual("Invalid AsyncContinuationExecutor mode: 2", ex.Message);
+ }
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
index a3485d0..e254fc8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
@@ -22,7 +22,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
using NUnit.Framework;
/// <summary>
- /// Tests cache operations with async/await.
+ /// Tests thin cache operations with async/await.
/// </summary>
public class CacheTestAsyncAwait : ClientTestBase
{
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
index 3e85d497..dbaa94b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
@@ -112,7 +112,7 @@ namespace Apache.Ignite.Core.Tests.Compute
cts.Cancel();
- Assert.IsTrue(task.IsCanceled);
+ TestUtils.WaitForTrueCondition(() => task.IsCanceled);
// Pass cancelled token
Assert.IsTrue(runner(Compute, cts.Token).IsCanceled);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index 820435f..ec5750d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -720,11 +720,11 @@ namespace Apache.Ignite.Core.Tests.Compute
// Cancel while executing
var task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
cts.Cancel();
- Assert.IsTrue(task.IsCanceled);
+ TestUtils.WaitForTrueCondition(() => task.IsCanceled);
// Use cancelled token
- task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
- Assert.IsTrue(task.IsCanceled);
+ var task2 = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
+ Assert.IsTrue(task2.IsCanceled);
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
similarity index 54%
copy from modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
copy to modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
index a3485d0..06e4f66 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
@@ -15,29 +15,36 @@
* limitations under the License.
*/
-namespace Apache.Ignite.Core.Tests.Client.Cache
+namespace Apache.Ignite.Core.Tests.Compute
{
using System.Threading.Tasks;
- using Apache.Ignite.Core.Tests.Client;
using NUnit.Framework;
/// <summary>
- /// Tests cache operations with async/await.
+ /// Tests compute async continuation behavior.
/// </summary>
- public class CacheTestAsyncAwait : ClientTestBase
+ public class ComputeTestAsyncAwait : TestBase
{
/// <summary>
- /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread.
+ /// Tests that RunAsync continuation does not capture Ignite threads.
/// </summary>
[Test]
- public async Task TestAsyncAwaitContinuationIsExecutedOnThreadPool()
+ public async Task TestComputeRunAsyncContinuation()
{
- var cache = GetClientCache<int>();
- await cache.PutAsync(1, 1).ConfigureAwait(false);
+ await Ignite.GetCompute().RunAsync(new ComputeAction());
- // This causes deadlock if async continuation is executed on response handler thread.
- cache.PutAsync(2, 2).Wait();
- Assert.AreEqual(2, cache.Get(2));
+ StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName());
+ }
+
+ /// <summary>
+ /// Tests that ExecuteAsync continuation does not capture Ignite threads.
+ /// </summary>
+ [Test]
+ public async Task TestComputeExecuteAsyncContinuation()
+ {
+ await Ignite.GetCompute().ExecuteAsync(new StringLengthEmptyTask(), "abc");
+
+ StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName());
}
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index bcc8347..f294afa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -24,7 +24,7 @@
isLateAffinityAssignment='false' springConfigUrl='c:\myconfig.xml' autoGenerateIgniteInstanceName='true'
peerAssemblyLoadingMode='CurrentAppDomain' longQueryWarningTimeout='1:2:3' isActiveOnStart='false'
consistentId='someId012' redirectJavaConsoleOutput='false' authenticationEnabled='true' mvccVacuumFrequency='10000' mvccVacuumThreadCount='4'
- sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true'>
+ sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true' asyncContinuationExecutor='UnsafeSynchronous'>
<localhost>127.1.1.1</localhost>
<binaryConfiguration compactFooter='false' keepDeserialized='true'>
<nameMapper type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+NameMapper' bar='testBar' />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
index 4488f93..acd5a9b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
@@ -47,5 +47,9 @@
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration"/>
</property>
+
+ <property name="asyncContinuationExecutor">
+ <bean class="org.apache.ignite.platform.PlatformTestExecutor"/>
+ </property>
</bean>
</beans>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index a98cad9..74d2688 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -386,6 +386,8 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(2, ec.Count);
Assert.AreEqual(new[] {"exec1", "exec2"}, ec.Select(e => e.Name));
Assert.AreEqual(new[] {1, 2}, ec.Select(e => e.Size));
+
+ Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous, cfg.AsyncContinuationExecutor);
}
/// <summary>
@@ -1081,7 +1083,8 @@ namespace Apache.Ignite.Core.Tests
Name = "exec-1",
Size = 11
}
- }
+ },
+ AsyncContinuationExecutor = AsyncContinuationExecutor.ThreadPool
};
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index e49b6c2..5d57aa8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -315,6 +315,7 @@ namespace Apache.Ignite.Core.Tests
CheckDefaultProperties(resCfg.ClientConnectorConfiguration);
Assert.AreEqual(false, resCfg.JavaPeerClassLoadingEnabled);
+ Assert.AreEqual(AsyncContinuationExecutor.Custom, resCfg.AsyncContinuationExecutor);
}
}
@@ -555,6 +556,7 @@ namespace Apache.Ignite.Core.Tests
Assert.AreEqual(IgniteConfiguration.DefaultAuthenticationEnabled, cfg.AuthenticationEnabled);
Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumFrequency, cfg.MvccVacuumFrequency);
Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumThreadCount, cfg.MvccVacuumThreadCount);
+ Assert.AreEqual(AsyncContinuationExecutor.ThreadPool, cfg.AsyncContinuationExecutor);
// Thread pools.
Assert.AreEqual(IgniteConfiguration.DefaultManagementThreadPoolSize, cfg.ManagementThreadPoolSize);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
index 5fdf70d..2dcc94a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
@@ -37,7 +37,7 @@ namespace Apache.Ignite.Core.Tests
{
var projFiles = TestUtils.GetDotNetSourceDir()
.GetFiles("*.csproj", SearchOption.AllDirectories)
- .Where(x => !x.FullName.ToLower().Contains("dotnetcore") &&
+ .Where(x => !x.FullName.ToLower().Contains("dotnetcore") &&
!x.FullName.Contains("Benchmark") &&
!x.FullName.Contains("templates") &&
!x.FullName.Contains("examples"))
@@ -95,7 +95,7 @@ namespace Apache.Ignite.Core.Tests
{
var excluded = new[]
{
- "ProjectFilesTest.cs",
+ "ProjectFilesTest.cs",
"CopyOnWriteConcurrentDictionary.cs",
"IgniteArgumentCheck.cs",
"DelegateConverter.cs",
@@ -109,7 +109,7 @@ namespace Apache.Ignite.Core.Tests
"HandleRegistry.cs",
"BinaryObjectHeader.cs"
};
-
+
var csFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.cs", SearchOption.AllDirectories);
foreach (var csFile in csFiles)
@@ -142,10 +142,10 @@ namespace Apache.Ignite.Core.Tests
public void TestAllCsharpFilesAreIncludedInProject()
{
var projFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.csproj", SearchOption.AllDirectories)
- .Where(x =>
+ .Where(x =>
!x.Name.Contains("DotNetCore") &&
- !x.Name.Contains("Benchmark") &&
- !x.FullName.Contains("templates") &&
+ !x.Name.Contains("Benchmark") &&
+ !x.FullName.Contains("templates") &&
!x.FullName.Contains("examples"));
var excludedFiles = new[]
@@ -153,7 +153,9 @@ namespace Apache.Ignite.Core.Tests
"IgnitionStartTest.cs",
"Common\\TestFixtureSetUp.cs",
"Common\\TestFixtureTearDown.cs",
- "Client\\Cache\\CacheTestAsyncAwait.cs"
+ "Client\\Cache\\CacheTestAsyncAwait.cs",
+ "Cache\\CacheTestAsyncAwait.cs",
+ "Compute\\ComputeTestAsyncAwait.cs"
};
Assert.Multiple(() =>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
index 72be97e..1b6303b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
@@ -395,6 +395,7 @@ namespace Apache.Ignite.Core.Tests.Services
/** <inheritDoc /> */
public ServicesTest.PlatformComputeBinarizable[] testBinarizableArray(ServicesTest.PlatformComputeBinarizable[] x)
{
+ // ReSharper disable once CoVariantArrayConversion
return (ServicesTest.PlatformComputeBinarizable[])testBinarizableArrayOfObjects(x);
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
index ac379a0..d76c9e0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
@@ -95,6 +95,15 @@ namespace Apache.Ignite.Core.Tests
CallVoidMethod(ClassPlatformProcessUtils, "destroyProcess", "()V");
}
+ /// <summary>
+ /// Gets the Java thread name.
+ /// </summary>
+ /// <returns></returns>
+ public static string GetJavaThreadName()
+ {
+ return CallStringMethod(ClassPlatformThreadUtils, "getThreadName", "()Ljava/lang/String;");
+ }
+
/** */
private static unsafe void CallStringMethod(string className, string methodName, string methodSig, string arg)
{
@@ -122,5 +131,17 @@ namespace Apache.Ignite.Core.Tests
env.CallStaticVoidMethod(cls, methodId);
}
}
+
+ /** */
+ private static unsafe string CallStringMethod(string className, string methodName, string methodSig)
+ {
+ var env = Jvm.Get().AttachCurrentThread();
+ using (var cls = env.FindClass(className))
+ {
+ var methodId = env.GetStaticMethodId(cls, methodName, methodSig);
+ var res = env.CallStaticObjectMethod(cls, methodId);
+ return env.JStringToString(res.Target);
+ }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index a0eb1e4..b61aad3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -81,6 +81,7 @@
<Compile Include="Cluster\IBaselineNode.cs" />
<Compile Include="Common\IgniteIllegalStateException.cs" />
<Compile Include="Common\IgniteProductVersion.cs" />
+ <Compile Include="Configuration\AsyncContinuationExecutor.cs" />
<Compile Include="Configuration\CheckpointWriteOrder.cs" />
<Compile Include="Configuration\DataPageEvictionMode.cs" />
<Compile Include="Configuration\DiskPageCompression.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs
new file mode 100644
index 0000000..36953fb
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Configuration
+{
+ /// <summary>
+ /// Defines async continuations behavior.
+ /// </summary>
+ public enum AsyncContinuationExecutor
+ {
+ /// <summary>
+ /// Executes async continuations on the thread pool (default).
+ /// </summary>
+ ThreadPool = 0,
+
+ /// <summary>
+ /// Executes async continuations synchronously on the same thread that completes the previous operation.
+ /// <para />
+ /// WARNING: can cause deadlocks and performance issues when not used correctly.
+ /// <para />
+ /// Ignite performs cache operations using a special "striped" thread pool
+ /// (see <see cref="IgniteConfiguration.StripedThreadPoolSize"/>). Using this synchronous mode means that
+ /// async continuations (any code coming after <c>await cache.DoAsync()</c>, or code in <c>ContinueWith()</c>)
+ /// will run on the striped pool:
+ /// <ul>
+ /// <li>
+ /// Cache operations can't execute while user code runs on the striped thread.
+ /// </li>
+ /// <li>
+ /// Attempting other cache operations on the striped thread can cause a deadlock.
+ /// </li>
+ /// </ul>
+ /// <para />
+ /// This mode can improve performance, because continuations do not have to be scheduled on another thread.
+ /// However, special care is required to release striped threads as soon as possible.
+ /// </summary>
+ UnsafeSynchronous = 1,
+
+ /// <summary>
+ /// Indicates that custom executor is configured on the Java side.
+ /// <para />
+ /// This value should not be used explicitly.
+ /// </summary>
+ Custom = 2
+ }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
index 3ef2b7f..e6437d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
@@ -221,6 +221,9 @@ namespace Apache.Ignite.Core
/** */
private bool? _clientMode;
+ /** */
+ private AsyncContinuationExecutor? _asyncContinuationExecutor;
+
/// <summary>
/// Default network retry count.
/// </summary>
@@ -342,6 +345,7 @@ namespace Apache.Ignite.Core
writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout);
writer.WriteIntNullable(_sqlQueryHistorySize);
writer.WriteBooleanNullable(_javaPeerClassLoadingEnabled);
+ writer.WriteIntNullable((int?) _asyncContinuationExecutor);
if (SqlSchemas == null)
writer.WriteInt(0);
@@ -747,6 +751,7 @@ namespace Apache.Ignite.Core
_sysWorkerBlockedTimeout = r.ReadTimeSpanNullable();
_sqlQueryHistorySize = r.ReadIntNullable();
_javaPeerClassLoadingEnabled = r.ReadBooleanNullable();
+ _asyncContinuationExecutor = (AsyncContinuationExecutor?) r.ReadIntNullable();
int sqlSchemasCnt = r.ReadInt();
@@ -1729,10 +1734,20 @@ namespace Apache.Ignite.Core
/// and peer class loading in Java are two distinct and independent features.
/// <para />
/// </summary>
- public bool JavaPeerClassLoadingEnabled
+ public bool JavaPeerClassLoadingEnabled
{
get { return _javaPeerClassLoadingEnabled ?? default(bool); }
set { _javaPeerClassLoadingEnabled = value; }
}
+
+ /// <summary>
+ /// Gets or sets the async continuation behavior.
+ /// See <see cref="Apache.Ignite.Core.Configuration.AsyncContinuationExecutor"/> members for more details.
+ /// </summary>
+ public AsyncContinuationExecutor AsyncContinuationExecutor
+ {
+ get { return _asyncContinuationExecutor ?? default(AsyncContinuationExecutor); }
+ set { _asyncContinuationExecutor = value; }
+ }
}
}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 0b35ca4..950b738 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -128,6 +128,13 @@
</xs:restriction>
</xs:simpleType>
+ <xs:simpleType name="asyncContinuationExecutor" final="restriction">
+ <xs:restriction base="xs:string">
+ <xs:enumeration value="ThreadPool" />
+ <xs:enumeration value="UnsafeSynchronous" />
+ </xs:restriction>
+ </xs:simpleType>
+
<xs:element name="igniteConfiguration">
<xs:annotation>
<xs:documentation>Ignite configuration root.</xs:documentation>
@@ -2513,6 +2520,11 @@
<xs:documentation>Whether Java console output should be redirected to Console.Out and Console.Error.</xs:documentation>
</xs:annotation>
</xs:attribute>
+ <xs:attribute name="asyncContinuationExecutor" type="asyncContinuationExecutor">
+ <xs:annotation>
+ <xs:documentation>Async continuation behavior.</xs:documentation>
+ </xs:annotation>
+ </xs:attribute>
</xs:complexType>
</xs:element>
</xs:schema>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
index 489fe44..e2bed73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Compute
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
using System.Linq;
+ using System.Threading;
using Apache.Ignite.Core.Cluster;
using Apache.Ignite.Core.Common;
using Apache.Ignite.Core.Compute;
@@ -59,7 +60,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="stream">Stream.</param>
/// <returns>Policy.</returns>
int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream);
-
+
/// <summary>
/// Perform task reduce.
/// </summary>
@@ -70,7 +71,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/// </summary>
/// <param name="taskHandle">Task handle.</param>
void Complete(long taskHandle);
-
+
/// <summary>
/// Complete task with error.
/// </summary>
@@ -85,7 +86,7 @@ namespace Apache.Ignite.Core.Impl.Compute
internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder
{
/** Empty results. */
- private static readonly IList<IComputeJobResult<T>> EmptyRes =
+ private static readonly IList<IComputeJobResult<T>> EmptyRes =
new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>());
/** Compute instance. */
@@ -102,7 +103,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/** Task future. */
private readonly Future<TR> _fut = new Future<TR>();
-
+
/** Jobs whose results are cached. */
private ISet<object> _resJobs;
@@ -111,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Compute
/** Handles for jobs which are not serialized right away. */
private volatile List<long> _jobHandles;
-
+
/// <summary>
/// Constructor.
/// </summary>
@@ -241,7 +242,7 @@ namespace Apache.Ignite.Core.Impl.Compute
Finish(default(TR), e);
stream.Reset();
-
+
writer.WriteBoolean(false); // Map failed.
writer.WriteString(e.Message); // Write error message.
}
@@ -345,7 +346,7 @@ namespace Apache.Ignite.Core.Impl.Compute
throw;
}
}
-
+
/** <inheritDoc /> */
public void Reduce()
{
@@ -489,7 +490,8 @@ namespace Apache.Ignite.Core.Impl.Compute
/// <param name="err">Error.</param>
private void Finish(TR res, Exception err)
{
- _fut.OnDone(res, err);
+ // Always complete the future on a ThreadPool thread to avoid capturing Ignite "pub-" thread.
+ ThreadPool.QueueUserWorkItem(_ => _fut.OnDone(res, err));
}
/// <summary>
@@ -503,7 +505,7 @@ namespace Apache.Ignite.Core.Impl.Compute
var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry;
if (handles != null)
- foreach (var handle in handles)
+ foreach (var handle in handles)
handleRegistry.Release(handle, true);
handleRegistry.Release(taskHandle, true);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
index 4bada09..86515c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
@@ -230,7 +230,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged.Jni
/// <summary>
/// Calls the static object method.
/// </summary>
- private GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null)
+ public GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null)
{
var res = _callStaticObjectMethod(_envPtr, cls.Target, methodId, argsPtr);