You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by pt...@apache.org on 2021/04/16 15:53:30 UTC

[ignite] branch master updated: IGNITE-12033 Move async continuations away from striped pool

This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 38d279a  IGNITE-12033 Move async continuations away from striped pool
38d279a is described below

commit 38d279afd18f21e6d26a6f3730999600372e039f
Author: Pavel Tupitsyn <pt...@apache.org>
AuthorDate: Fri Apr 16 18:53:08 2021 +0300

    IGNITE-12033 Move async continuations away from striped pool
    
    User continuations should not run on striped pool - this can cause unexpected, hard to diagnose performance issues and deadlocks.
    
    * Add `IgniteConfiguration#asyncContinuationExecutor`. Defaults to `null`, which means `ForkJoinPool#commonPool` should be used.
    * Use default executor for Cache async operations
    * Add .NET and Java tests
    
    IEP: https://cwiki.apache.org/confluence/display/IGNITE/IEP-70%3A+Async+Continuation+Executor
---
 .../jmh/cache/JmhCacheAsyncListenBenchmark.java    | 162 +++++++++++++
 .../ignite/configuration/IgniteConfiguration.java  |  37 +++
 .../apache/ignite/internal/GridKernalContext.java  |   8 +
 .../ignite/internal/GridKernalContextImpl.java     |  10 +
 .../processors/cache/IgniteCacheFutureImpl.java    |   8 +-
 .../processors/cache/IgniteCacheProxyImpl.java     |  26 ++-
 .../cache/IgniteFinishedCacheFutureImpl.java       |   2 +-
 .../wal/reader/StandaloneGridKernalContext.java    |   6 +
 .../processors/datastreamer/DataStreamerImpl.java  |   4 +-
 .../platform/utils/PlatformConfigurationUtils.java |  40 ++++
 .../internal/util/future/IgniteFutureImpl.java     |  24 +-
 .../cache/CacheAsyncContinuationExecutorTest.java  | 254 +++++++++++++++++++++
 ...eAsyncContinuationSynchronousExecutorTest.java} |  26 ++-
 .../util/future/IgniteCacheFutureImplTest.java     |   2 +-
 .../ignite/platform/PlatformTestExecutor.java}     |  16 +-
 .../ignite/platform/PlatformThreadUtils.java       |   9 +
 .../ignite/testsuites/IgniteCacheTestSuite.java    |   4 +
 .../Cache/CacheTestAsyncAwait.cs                   | 115 ++++++++++
 .../Client/Cache/CacheTestAsyncAwait.cs            |   2 +-
 .../Compute/CancellationTest.cs                    |   2 +-
 .../Compute/ComputeApiTest.cs                      |   6 +-
 .../ComputeTestAsyncAwait.cs}                      |  29 ++-
 .../Config/full-config.xml                         |   2 +-
 .../Config/spring-test.xml                         |   4 +
 .../IgniteConfigurationSerializerTest.cs           |   5 +-
 .../IgniteConfigurationTest.cs                     |   2 +
 .../Apache.Ignite.Core.Tests/ProjectFilesTest.cs   |  16 +-
 .../Services/PlatformTestService.cs                |   1 +
 .../Apache.Ignite.Core.Tests/TestUtilsJni.cs       |  21 ++
 .../Apache.Ignite.Core/Apache.Ignite.Core.csproj   |   1 +
 .../Configuration/AsyncContinuationExecutor.cs     |  60 +++++
 .../Apache.Ignite.Core/IgniteConfiguration.cs      |  17 +-
 .../IgniteConfigurationSection.xsd                 |  12 +
 .../Impl/Compute/ComputeTaskHolder.cs              |  20 +-
 .../Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs   |   2 +-
 35 files changed, 882 insertions(+), 73 deletions(-)

diff --git a/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java
new file mode 100644
index 0000000..d494d39
--- /dev/null
+++ b/modules/benchmarks/src/main/java/org/apache/ignite/internal/benchmarks/jmh/cache/JmhCacheAsyncListenBenchmark.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.benchmarks.jmh.cache;
+
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.internal.benchmarks.jmh.runner.JmhIdeBenchmarkRunner;
+import org.apache.ignite.internal.benchmarks.model.IntValue;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.Mode;
+
+
+/**
+ * Cache async listen benchmark.
+ * Measures {@link IgniteFuture#listen(IgniteInClosure)} performance.
+ *
+ * Results on i7-9700K, Ubuntu 20.04.1, JDK 1.8.0_275:
+ *
+ * Without ForkJoinPool async continuation executor:
+ * Benchmark                          Mode  Cnt      Score      Error  Units
+ * JmhCacheAsyncListenBenchmark.get  thrpt   10  82052.664 ± 2891.182  ops/s
+ * JmhCacheAsyncListenBenchmark.put  thrpt   10  77859.584 ± 2071.196  ops/s
+ *
+ * With ForkJoinPool async continuation executor:
+ * Benchmark                          Mode  Cnt      Score      Error  Units
+ * JmhCacheAsyncListenBenchmark.get  thrpt   10  76008.272 ± 1506.928  ops/s
+ * JmhCacheAsyncListenBenchmark.put  thrpt   10  73393.986 ± 1336.420  ops/s
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class JmhCacheAsyncListenBenchmark extends JmhCacheAbstractBenchmark {
+    /** {@inheritDoc} */
+    @Override public void setup() throws Exception {
+        super.setup();
+
+        IgniteDataStreamer<Integer, IntValue> dataLdr = node.dataStreamer(cache.getName());
+
+        for (int i = 0; i < CNT; i++)
+            dataLdr.addData(i, new IntValue(i));
+
+        dataLdr.close();
+
+        System.out.println("Cache populated.");
+    }
+
+    /**
+     * Test PUT operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    public void put() throws Exception {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        blockingListen(cache.putAsync(key, new IntValue(key)));
+    }
+
+    /**
+     * Test GET operation.
+     *
+     * @throws Exception If failed.
+     */
+    @Benchmark
+    public void get() throws Exception {
+        int key = ThreadLocalRandom.current().nextInt(CNT);
+
+        blockingListen(cache.getAsync(key));
+    }
+
+    /**
+     * Blocks until future completion using {@link IgniteFuture#listen(IgniteInClosure)}.
+     *
+     * @param future Future
+     */
+    private static void blockingListen(IgniteFuture future) throws Exception {
+        AtomicBoolean ab = new AtomicBoolean();
+
+        future.listen(f -> {
+            try {
+                synchronized (ab) {
+                    ab.set(true);
+                    ab.notifyAll();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        synchronized (ab) {
+            while (!ab.get()) {
+                ab.wait(5000);
+            }
+        }
+    }
+
+    /**
+     * Run benchmarks.
+     *
+     * @param args Arguments.
+     * @throws Exception If failed.
+     */
+    public static void main(String[] args) throws Exception {
+        run(CacheAtomicityMode.ATOMIC);
+    }
+
+    /**
+     * Run benchmarks for atomic cache.
+     *
+     * @param atomicityMode Atomicity mode.
+     * @throws Exception If failed.
+     */
+    private static void run(CacheAtomicityMode atomicityMode) throws Exception {
+        run(4, true, atomicityMode, CacheWriteSynchronizationMode.PRIMARY_SYNC);
+    }
+
+    /**
+     * Run benchmark.
+     *
+     * @param threads Amount of threads.
+     * @param client Client mode flag.
+     * @param atomicityMode Atomicity mode.
+     * @param writeSyncMode Write synchronization mode.
+     * @throws Exception If failed.
+     */
+    private static void run(int threads, boolean client, CacheAtomicityMode atomicityMode,
+        CacheWriteSynchronizationMode writeSyncMode) throws Exception {
+
+        JmhIdeBenchmarkRunner.create()
+                .forks(1)
+                .threads(threads)
+                .benchmarks(JmhCacheAsyncListenBenchmark.class.getSimpleName())
+                .jvmArguments(
+                    "-Xms4g",
+                    "-Xmx4g",
+                    JmhIdeBenchmarkRunner.createProperty(PROP_ATOMICITY_MODE, atomicityMode),
+                    JmhIdeBenchmarkRunner.createProperty(PROP_WRITE_SYNC_MODE, writeSyncMode),
+                    JmhIdeBenchmarkRunner.createProperty(PROP_DATA_NODES, 2),
+                    JmhIdeBenchmarkRunner.createProperty(PROP_CLIENT_MODE, client))
+                .benchmarkModes(Mode.Throughput)
+                .run();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 3801795..aa3109d 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -21,6 +21,8 @@ import java.io.Serializable;
 import java.lang.management.ManagementFactory;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ForkJoinPool;
 import java.util.zip.Deflater;
 import javax.cache.configuration.Factory;
 import javax.cache.event.CacheEntryListener;
@@ -607,6 +609,9 @@ public class IgniteConfiguration {
     /** SQL configuration. */
     private SqlConfiguration sqlCfg = new SqlConfiguration();
 
+    /** Executor for async operations continuations. */
+    private Executor asyncContinuationExecutor;
+
     /** Shutdown policy for cluster. */
     public ShutdownPolicy shutdown = DFLT_SHUTDOWN_POLICY;
 
@@ -738,6 +743,7 @@ public class IgniteConfiguration {
         warmupClos = cfg.getWarmupClosure();
         sqlCfg = cfg.getSqlConfiguration();
         shutdown = cfg.getShutdownPolicy();
+        asyncContinuationExecutor = cfg.getAsyncContinuationExecutor();
     }
 
     /**
@@ -3641,6 +3647,37 @@ public class IgniteConfiguration {
         return this;
     }
 
+    /**
+     * Gets the continuation executor for cache async APIs.
+     * <p />
+     * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+     * <p />
+     * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners
+     * will be invoked using this executor.
+     *
+     * @return Executor for async continuations.
+     */
+    public Executor getAsyncContinuationExecutor() {
+        return asyncContinuationExecutor;
+    }
+
+    /**
+     * Sets the continuation executor for cache async APIs.
+     * <p />
+     * When <code>null</code> (default), {@link ForkJoinPool#commonPool()} is used.
+     * <p />
+     * When async operation completes, corresponding {@link org.apache.ignite.lang.IgniteFuture} listeners
+     * will be invoked using this executor.
+     *
+     * @param asyncContinuationExecutor Executor for async continuations.
+     * @return {@code this} for chaining.
+     */
+    public IgniteConfiguration setAsyncContinuationExecutor(Executor asyncContinuationExecutor) {
+        this.asyncContinuationExecutor = asyncContinuationExecutor;
+
+        return this;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteConfiguration.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 281b8a0..e0e51fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.configuration.IgniteConfiguration;
@@ -784,4 +785,11 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      * @return Performance statistics processor.
      */
     public PerformanceStatisticsProcessor performanceStatistics();
+
+    /**
+     * Executor that is in charge of processing user async continuations.
+     *
+     * @return Executor that is in charge of processing user async continuations.
+     */
+    public Executor getAsyncContinuationExecutor();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index bb8fcf8..25f4ed5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -30,7 +30,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -1333,4 +1336,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     @Override public PerformanceStatisticsProcessor performanceStatistics() {
         return perfStatProc;
     }
+
+    /** {@inheritDoc} */
+    @Override public Executor getAsyncContinuationExecutor() {
+        return config().getAsyncContinuationExecutor() == null
+                ? ForkJoinPool.commonPool()
+                : config().getAsyncContinuationExecutor();
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
index 6f6223f..c2dca1f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheFutureImpl.java
@@ -38,18 +38,18 @@ public class IgniteCacheFutureImpl<V> extends IgniteFutureImpl<V> {
      *
      * @param fut Internal future.
      */
-    public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut) {
-        super(fut);
+    public IgniteCacheFutureImpl(IgniteInternalFuture<V> fut, Executor defaultExecutor) {
+        super(fut, defaultExecutor);
     }
 
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<T> chain(IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
-        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null));
+        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, null), defaultExecutor);
     }
 
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<T> chainAsync(IgniteClosure<? super IgniteFuture<V>, T> doneCb, Executor exec) {
-        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec));
+        return new IgniteCacheFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
index 6222f60..c481450 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxyImpl.java
@@ -31,6 +31,7 @@ import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -1906,7 +1907,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @Override public IgniteFuture<?> destroyAsync() {
         GridCacheContext<K, V> ctx = getContextSafe();
 
-        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null));
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicDestroyCache(cacheName, false, true, false, null), exec());
     }
 
     /** {@inheritDoc} */
@@ -1918,7 +1919,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @Override public IgniteFuture<?> closeAsync() {
         GridCacheContext<K, V> ctx = getContextSafe();
 
-        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName));
+        return new IgniteFutureImpl<>(ctx.kernalContext().cache().dynamicCloseCache(cacheName), exec());
     }
 
     /** {@inheritDoc} */
@@ -2064,7 +2065,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
                 assert restartFut != null;
 
-                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), cacheName);
+                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), cacheName);
             }
             else
                 throw restartingException;
@@ -2072,7 +2073,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
         if (restartFut != null) {
             if (X.hasCause(e, CacheStoppedException.class) || X.hasSuppressed(e, CacheStoppedException.class))
-                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut), "Cache is restarting: " +
+                throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(restartFut, exec()), "Cache is restarting: " +
                         cacheName, e);
         }
 
@@ -2100,7 +2101,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
 
     /** {@inheritDoc} */
     @Override protected <R> IgniteFuture<R> createFuture(IgniteInternalFuture<R> fut) {
-        return new IgniteCacheFutureImpl<>(fut);
+        return new IgniteCacheFutureImpl<>(fut, exec());
     }
 
     /**
@@ -2216,7 +2217,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     @Override public IgniteFuture<Boolean> rebalance() {
         GridCacheContext<K, V> ctx = getContextSafe();
 
-        return new IgniteFutureImpl<>(ctx.preloader().forceRebalance());
+        return new IgniteFutureImpl<>(ctx.preloader().forceRebalance(), exec());
     }
 
     /** {@inheritDoc} */
@@ -2228,7 +2229,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
         if (fut == null)
             return new IgniteFinishedFutureImpl<>();
 
-        return new IgniteFutureImpl<>(fut);
+        return new IgniteFutureImpl<>(fut, exec());
     }
 
     /**
@@ -2256,7 +2257,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
                 //do nothing
             }
 
-            throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut), cacheName);
+            throw new IgniteCacheRestartingException(new IgniteFutureImpl<>(currentFut, exec()), cacheName);
         }
     }
 
@@ -2363,6 +2364,13 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
     }
 
     /**
+     * Async continuation executor.
+     */
+    private Executor exec() {
+        return context().kernalContext().getAsyncContinuationExecutor();
+    }
+
+    /**
      *
      */
     private class RestartFuture extends GridFutureAdapter<Void> {
@@ -2395,7 +2403,7 @@ public class IgniteCacheProxyImpl<K, V> extends AsyncSupportAdapter<IgniteCache<
             }
 
             throw new IgniteCacheRestartingException(
-                new IgniteFutureImpl<>(this),
+                new IgniteFutureImpl<>(this, exec()),
                 "Cache is restarting: " + name
             );
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
index f7bc95b..f515811 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
@@ -27,6 +27,6 @@ public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
      * @param err Error.
      */
     public IgniteFinishedCacheFutureImpl(Throwable err) {
-        super(new GridFinishedFuture<V>(err));
+        super(new GridFinishedFuture<V>(err), null);
     }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
index 54ae4ab..912876b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -746,4 +747,9 @@ public class StandaloneGridKernalContext implements GridKernalContext {
     @Override public PerformanceStatisticsProcessor performanceStatistics() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Override public Executor getAsyncContinuationExecutor() {
+        return null;
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
index 9b1f624..452789d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastreamer/DataStreamerImpl.java
@@ -370,7 +370,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
 
         fut = new DataStreamerFuture(this);
 
-        publicFut = new IgniteCacheFutureImpl<>(fut);
+        publicFut = new IgniteCacheFutureImpl<>(fut, ctx.getAsyncContinuationExecutor());
 
         GridCacheAdapter cache = ctx.cache().internalCache(cacheName);
 
@@ -708,7 +708,7 @@ public class DataStreamerImpl<K, V> implements IgniteDataStreamer<K, V>, Delayed
     @NotNull protected IgniteCacheFutureImpl createDataLoadFuture() {
         GridFutureAdapter internalFut0 = new GridFutureAdapter();
 
-        IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0);
+        IgniteCacheFutureImpl fut = new IgniteCacheFutureImpl(internalFut0, ctx.getAsyncContinuationExecutor());
 
         internalFut0.listen(rmvActiveFut);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
index fbe3218..5bead46 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformConfigurationUtils.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
+import java.util.concurrent.Executor;
 import javax.cache.configuration.Factory;
 import javax.cache.expiry.ExpiryPolicy;
 import javax.net.ssl.SSLContext;
@@ -122,6 +123,9 @@ import org.apache.ignite.util.AttributeNodeFilter;
  */
 @SuppressWarnings({"unchecked", "TypeMayBeWeakened"})
 public class PlatformConfigurationUtils {
+    /** */
+    private static final Executor synchronousExecutor = Runnable::run;
+
     /**
      * Write .Net configuration to the stream.
      *
@@ -760,6 +764,8 @@ public class PlatformConfigurationUtils {
             cfg.setSqlQueryHistorySize(in.readInt());
         if (in.readBoolean())
             cfg.setPeerClassLoadingEnabled(in.readBoolean());
+        if (in.readBoolean())
+            cfg.setAsyncContinuationExecutor(getAsyncContinuationExecutor(in.readInt()));
 
         int sqlSchemasCnt = in.readInt();
 
@@ -1366,6 +1372,8 @@ public class PlatformConfigurationUtils {
         w.writeInt(cfg.getSqlQueryHistorySize());
         w.writeBoolean(true);
         w.writeBoolean(cfg.isPeerClassLoadingEnabled());
+        w.writeBoolean(true);
+        w.writeInt(getAsyncContinuationExecutorMode(cfg.getAsyncContinuationExecutor()));
 
         if (cfg.getSqlSchemas() == null)
             w.writeInt(0);
@@ -2335,6 +2343,38 @@ public class PlatformConfigurationUtils {
     }
 
     /**
+     * Gets the executor.
+     *
+     * @param mode Mode.
+     * @return Executor.
+     */
+    private static Executor getAsyncContinuationExecutor(int mode) {
+        switch (mode) {
+            case 0: return null;
+            case 1: return synchronousExecutor;
+            default: throw new IgniteException("Invalid AsyncContinuationExecutor mode: " + mode);
+        }
+    }
+
+    /**
+     * Gets the executor mode.
+     *
+     * @param executor Executor.
+     * @return Mode.
+     */
+    private static int getAsyncContinuationExecutorMode(Executor executor) {
+        if (executor == null) {
+            return 0;
+        }
+
+        if (executor.equals(synchronousExecutor)) {
+            return 1;
+        }
+
+        return 2;
+    }
+
+    /**
      * Private constructor.
      */
     private PlatformConfigurationUtils() {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
index ab8aa7d..307a28f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/IgniteFutureImpl.java
@@ -38,13 +38,25 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
     /** */
     protected final IgniteInternalFuture<V> fut;
 
+    /** */
+    protected final Executor defaultExecutor;
+
     /**
      * @param fut Future.
      */
     public IgniteFutureImpl(IgniteInternalFuture<V> fut) {
+        this(fut, null);
+    }
+
+    /**
+     * @param fut Future.
+     * @param defaultExecutor Default executor.
+     */
+    public IgniteFutureImpl(IgniteInternalFuture<V> fut, @Nullable Executor defaultExecutor) {
         assert fut != null;
 
         this.fut = fut;
+        this.defaultExecutor = defaultExecutor;
     }
 
     /**
@@ -68,7 +80,10 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
     @Override public void listen(IgniteInClosure<? super IgniteFuture<V>> lsnr) {
         A.notNull(lsnr, "lsnr");
 
-        fut.listen(new InternalFutureListener(lsnr));
+        if (defaultExecutor != null && !isDone())
+            fut.listen(new InternalFutureListener(new AsyncFutureListener<>(lsnr, defaultExecutor)));
+        else
+            fut.listen(new InternalFutureListener(lsnr));
     }
 
     /** {@inheritDoc} */
@@ -81,7 +96,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
 
     /** {@inheritDoc} */
     @Override public <T> IgniteFuture<T> chain(final IgniteClosure<? super IgniteFuture<V>, T> doneCb) {
-        return new IgniteFutureImpl<>(chainInternal(doneCb, null));
+        return new IgniteFutureImpl<>(chainInternal(doneCb, null), defaultExecutor);
     }
 
     /** {@inheritDoc} */
@@ -90,7 +105,7 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
         A.notNull(doneCb, "doneCb");
         A.notNull(exec, "exec");
 
-        return new IgniteFutureImpl<>(chainInternal(doneCb, exec));
+        return new IgniteFutureImpl<>(chainInternal(doneCb, exec), defaultExecutor);
     }
 
     /**
@@ -115,6 +130,9 @@ public class IgniteFutureImpl<V> implements IgniteFuture<V> {
         if (exec != null)
             return fut.chain(clos, exec);
 
+        if (defaultExecutor != null && !isDone())
+            return fut.chain(clos, defaultExecutor);
+
         return fut.chain(clos);
     }
 
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java
new file mode 100644
index 0000000..48bac86
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationExecutorTest.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
+
+import javax.cache.Cache;
+import javax.cache.configuration.Factory;
+import javax.cache.integration.CacheLoaderException;
+
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.store.CacheStore;
+import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.junit.Test;
+
+/**
+ * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)}.
+ */
+@SuppressWarnings("rawtypes")
+public class CacheAsyncContinuationExecutorTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected CacheAtomicityMode atomicityMode() {
+        return CacheAtomicityMode.ATOMIC;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int backups() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override protected CacheConfiguration cacheConfiguration(String igniteInstanceName) throws Exception {
+        CacheConfiguration ccfg = super.cacheConfiguration(igniteInstanceName);
+
+        // Use cache store with a write delay to make sure future does not complete before we register a listener.
+        ccfg.setCacheStoreFactory(new DelayedStoreFactory());
+        ccfg.setReadThrough(true);
+        ccfg.setWriteThrough(true);
+
+        return ccfg;
+    }
+
+    /**
+     * Gets the expected thread name prefix.
+     * @return Prefix.
+     */
+    protected String expectedThreadNamePrefix() {
+        return "ForkJoinPool.commonPool-worker";
+    }
+
+    /**
+     * Gets a value indicating whether continuation thread can execute cache operations.
+     * @return Value indicating whether continuation thread can execute cache operations.
+     */
+    protected boolean allowCacheOperationsInContinuation() {
+        return true;
+    }
+
+    /**
+     * Tests future listen with default executor.
+     */
+    @Test
+    public void testRemoteOperationListenContinuesOnDefaultExecutor() throws Exception {
+        testRemoteOperationContinuesOnDefaultExecutor(false);
+    }
+
+    /**
+     * Tests future chain with default executor.
+     */
+    @Test
+    public void testRemoteOperationChainContinuesOnDefaultExecutor() throws Exception {
+        testRemoteOperationContinuesOnDefaultExecutor(true);
+    }
+
+    /**
+     * Tests that an operation on a local key executes synchronously, and listener is called immediately on the current
+     * thread.
+     */
+    @Test
+    public void testLocalOperationListenerExecutesSynchronously() {
+        final String key = getPrimaryKey(0);
+
+        IgniteCache<String, Integer> cache = jcache(0);
+        AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+        cache.putAsync(key, 1).listen(f -> listenThreadName.set(Thread.currentThread().getName()));
+
+        assertEquals(Thread.currentThread().getName(), listenThreadName.get());
+    }
+
+    /**
+     * Tests that an operation on a remote key executes on striped pool directly when a syncronous executor is provided.
+     * This demonstrates that default safe behavior can be overridden with a faster, but unsafe old behavior
+     * for an individual operation.
+     */
+    @Test
+    public void testRemoteOperationListenerExecutesOnStripedPoolWhenCustomExecutorIsProvided() throws Exception {
+        final String key = getPrimaryKey(1);
+
+        IgniteCache<String, Integer> cache = jcache(0);
+        AtomicReference<String> listenThreadName = new AtomicReference<>("");
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        cache.putAsync(key, 1).listenAsync(f -> {
+            listenThreadName.set(Thread.currentThread().getName());
+
+            try {
+                barrier.await(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }, Runnable::run);
+
+        barrier.await(10, TimeUnit.SECONDS);
+
+        assertTrue(listenThreadName.get(), listenThreadName.get().startsWith("sys-stripe-"));
+    }
+
+    /**
+     * Tests that an operation on a local key executes synchronously, and chain is called immediately on the current
+     * thread.
+     */
+    @Test
+    public void testLocalOperationChainExecutesSynchronously() {
+        final String key = getPrimaryKey(0);
+
+        IgniteCache<String, Integer> cache = jcache(0);
+        AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+        cache.putAsync(key, 1).chain(f -> {
+            listenThreadName.set(Thread.currentThread().getName());
+
+            return new IgniteFinishedFutureImpl<>();
+        });
+
+        assertEquals(Thread.currentThread().getName(), listenThreadName.get());
+    }
+
+    /**
+     * Tests future chain / listen with default executor.
+     *
+     * This test would hang before {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)}
+     * was introduced, or if we set {@link Runnable#run()} as the executor.
+     */
+    private void testRemoteOperationContinuesOnDefaultExecutor(boolean chain) throws Exception {
+        final String key = getPrimaryKey(1);
+
+        IgniteCache<String, Integer> cache = jcache(0);
+        CyclicBarrier barrier = new CyclicBarrier(2);
+        AtomicReference<String> listenThreadName = new AtomicReference<>("");
+
+        IgniteInClosure<IgniteFuture<Void>> clos = f -> {
+            listenThreadName.set(Thread.currentThread().getName());
+
+            if (allowCacheOperationsInContinuation()) {
+                // Check that cache operations do not deadlock.
+                cache.replace(key, 2);
+            }
+
+            try {
+                barrier.await(10, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        };
+
+        IgniteFuture<Void> fut = cache.putAsync(key, 1);
+
+        if (chain)
+            fut.chain(f -> {
+                clos.apply(f);
+                return new IgniteFinishedFutureImpl<>();
+            });
+        else
+            fut.listen(clos);
+
+        barrier.await(10, TimeUnit.SECONDS);
+
+        assertEquals(allowCacheOperationsInContinuation() ? 2 : 1, cache.get(key).intValue());
+        assertTrue(listenThreadName.get(), listenThreadName.get().startsWith(expectedThreadNamePrefix()));
+    }
+
+    /**
+     * Gets the primary key.
+     * @param nodeIdx Node index.
+     * @return Key.
+     */
+    @SuppressWarnings("OptionalGetWithoutIsPresent")
+    private String getPrimaryKey(int nodeIdx) {
+        return IntStream.range(0, 1000)
+                .mapToObj(String::valueOf)
+                .filter(x -> belongs(x, nodeIdx))
+                .findFirst()
+                .get();
+    }
+
+    /** */
+    private static class DelayedStoreFactory implements Factory<CacheStore> {
+        /** {@inheritDoc} */
+        @Override public CacheStore create() {
+            return new CacheStoreAdapter() {
+                /** {@inheritDoc} */
+                @Override public Object load(Object key) throws CacheLoaderException {
+                    return null;
+                }
+
+                /** {@inheritDoc} */
+                @Override public void write(Cache.Entry entry) {
+                    try {
+                        Thread.sleep(100);
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                }
+
+                /** {@inheritDoc} */
+                @Override public void delete(Object key) {
+                    // No-op.
+                }
+            };
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
similarity index 52%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
copy to modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
index f7bc95b..0651f6cc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheAsyncContinuationSynchronousExecutorTest.java
@@ -17,16 +17,26 @@
 
 package org.apache.ignite.internal.processors.cache;
 
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import java.util.concurrent.Executor;
+
+import org.apache.ignite.configuration.IgniteConfiguration;
 
 /**
- *
+ * Tests {@link IgniteConfiguration#setAsyncContinuationExecutor(Executor)} with synchronous executor (old behavior).
  */
-public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
-    /**
-     * @param err Error.
-     */
-    public IgniteFinishedCacheFutureImpl(Throwable err) {
-        super(new GridFinishedFuture<V>(err));
+public class CacheAsyncContinuationSynchronousExecutorTest extends CacheAsyncContinuationExecutorTest {
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName).setAsyncContinuationExecutor(Runnable::run);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected String expectedThreadNamePrefix() {
+        return "sys-stripe-";
+    }
+
+    /** {@inheritDoc} */
+    @Override protected boolean allowCacheOperationsInContinuation() {
+        return false;
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
index 46f1706..b74a7fe 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/future/IgniteCacheFutureImplTest.java
@@ -28,7 +28,7 @@ import org.apache.ignite.internal.processors.cache.IgniteCacheFutureImpl;
 public class IgniteCacheFutureImplTest extends IgniteFutureImplTest {
     /** {@inheritDoc} */
     @Override protected <V> IgniteFutureImpl<V> createFuture(IgniteInternalFuture<V> fut) {
-        return new IgniteCacheFutureImpl<>(fut);
+        return new IgniteCacheFutureImpl<>(fut, Runnable::run);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
similarity index 70%
copy from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
copy to modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
index f7bc95b..4bf26c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteFinishedCacheFutureImpl.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformTestExecutor.java
@@ -15,18 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.processors.cache;
+package org.apache.ignite.platform;
 
-import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import java.util.concurrent.Executor;
 
 /**
- *
+ * Test executor.
  */
-public class IgniteFinishedCacheFutureImpl<V> extends IgniteCacheFutureImpl<V> {
-    /**
-     * @param err Error.
-     */
-    public IgniteFinishedCacheFutureImpl(Throwable err) {
-        super(new GridFinishedFuture<V>(err));
+public class PlatformTestExecutor implements Executor {
+    /** {@inheritDoc} */
+    @Override public void execute(Runnable runnable) {
+        runnable.run();
     }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
index 0029d1e..b60ebeb 100644
--- a/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/platform/PlatformThreadUtils.java
@@ -48,4 +48,13 @@ public class PlatformThreadUtils {
             }
         }
     }
+
+    /**
+     * Gets the thread name.
+     *
+     * @return Thread name.
+     */
+    public static String getThreadName() {
+        return Thread.currentThread().getName();
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
index 31bc4d9..719232c 100755
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.managers.communication.MessageDirectTypeIdConf
 import org.apache.ignite.internal.processors.cache.BinaryMetadataRegistrationInsideEntryProcessorTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityCallSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheAffinityKeyConfigurationMismatchTest;
+import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationExecutorTest;
+import org.apache.ignite.internal.processors.cache.CacheAsyncContinuationSynchronousExecutorTest;
 import org.apache.ignite.internal.processors.cache.CacheAtomicSingleMessageCountSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteQueueTest;
 import org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
@@ -396,6 +398,8 @@ public class IgniteCacheTestSuite {
         GridTestUtils.addTestIfNeeded(suite, InterceptorWithKeepBinaryCacheFullApiTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, BinaryMetadataRegistrationInsideEntryProcessorTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationExecutorTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, CacheAsyncContinuationSynchronousExecutorTest.class, ignoredTests);
 
         suite.add(IgniteGetNonPlainKeyReadThroughSelfTest.class);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs
new file mode 100644
index 0000000..f27600e
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/CacheTestAsyncAwait.cs
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// ReSharper disable MethodHasAsyncOverload
+namespace Apache.Ignite.Core.Tests.Cache
+{
+    using System.Threading;
+    using System.Threading.Tasks;
+    using Apache.Ignite.Core.Common;
+    using Apache.Ignite.Core.Configuration;
+    using NUnit.Framework;
+
+    /// <summary>
+    /// Tests thick cache operations with async/await.
+    /// </summary>
+    public class CacheTestAsyncAwait : TestBase
+    {
+        /// <summary>
+        /// Initializes a new instance of <see cref="CacheTestAsyncAwait"/> class.
+        /// </summary>
+        public CacheTestAsyncAwait() : base(2)
+        {
+            // No-op.
+        }
+
+        /// <summary>
+        /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread.
+        /// </summary>
+        [Test]
+        public async Task TestAsyncAwaitContinuationIsExecutedWithConfiguredExecutor()
+        {
+            var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName);
+            var key = TestUtils.GetPrimaryKey(Ignite2, cache.Name);
+
+            // This causes deadlock if async continuation is executed on the striped thread.
+            await cache.PutAsync(key, 1);
+            cache.Replace(key, 2);
+
+            Assert.AreEqual(2, cache.Get(key));
+            StringAssert.DoesNotContain("sys-stripe-", TestUtilsJni.GetJavaThreadName());
+        }
+
+        /// <summary>
+        /// Tests that local operation executes synchronously and completes on the same thread.
+        /// </summary>
+        [Test]
+        public async Task TestLocalOperationExecutesSynchronously()
+        {
+            var cache = Ignite.GetOrCreateCache<int, int>(TestUtils.TestName);
+            var key = TestUtils.GetPrimaryKey(Ignite, cache.Name);
+            var origThread = Thread.CurrentThread;
+
+            await cache.PutAsync(key, key);
+
+            Assert.AreEqual(origThread.ManagedThreadId, Thread.CurrentThread.ManagedThreadId);
+        }
+
+        /// <summary>
+        /// Tests that explicitly configured synchronous executor runs continuations on the striped pool.
+        /// </summary>
+        [Test]
+        public async Task TestSynchronousExecutorRunsContinuationsOnStripedPool()
+        {
+            var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration(name: "client"))
+            {
+                AsyncContinuationExecutor = AsyncContinuationExecutor.UnsafeSynchronous,
+                ClientMode = true
+            };
+
+            using (var client = Ignition.Start(cfg))
+            {
+                var cache = client.GetOrCreateCache<int, int>(TestUtils.TestName);
+
+                await cache.PutAsync(1, 1);
+
+                StringAssert.StartsWith("sys-stripe-", TestUtilsJni.GetJavaThreadName());
+
+                Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous,
+                    client.GetConfiguration().AsyncContinuationExecutor);
+
+                // Jump away from striped pool to avoid deadlock on node stop.
+                await Task.Yield();
+            }
+        }
+
+        /// <summary>
+        /// Tests that invalid executor configuration is rejected.
+        /// </summary>
+        [Test]
+        public void TestInvalidExecutorConfigurationFailsOnStart()
+        {
+            var cfg = new IgniteConfiguration(TestUtils.GetTestConfiguration())
+            {
+                AsyncContinuationExecutor = AsyncContinuationExecutor.Custom
+            };
+
+            var ex = Assert.Throws<IgniteException>(() => Ignition.Start(cfg));
+            Assert.AreEqual("Invalid AsyncContinuationExecutor mode: 2", ex.Message);
+        }
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
index a3485d0..e254fc8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
@@ -22,7 +22,7 @@ namespace Apache.Ignite.Core.Tests.Client.Cache
     using NUnit.Framework;
 
     /// <summary>
-    /// Tests cache operations with async/await.
+    /// Tests thin cache operations with async/await.
     /// </summary>
     public class CacheTestAsyncAwait : ClientTestBase
     {
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
index 3e85d497..dbaa94b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/CancellationTest.cs
@@ -112,7 +112,7 @@ namespace Apache.Ignite.Core.Tests.Compute
 
                 cts.Cancel();
 
-                Assert.IsTrue(task.IsCanceled);
+                TestUtils.WaitForTrueCondition(() => task.IsCanceled);
 
                 // Pass cancelled token
                 Assert.IsTrue(runner(Compute, cts.Token).IsCanceled);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
index 820435f..ec5750d 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeApiTest.cs
@@ -720,11 +720,11 @@ namespace Apache.Ignite.Core.Tests.Compute
                 // Cancel while executing
                 var task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
                 cts.Cancel();
-                Assert.IsTrue(task.IsCanceled);
+                TestUtils.WaitForTrueCondition(() => task.IsCanceled);
 
                 // Use cancelled token
-                task = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
-                Assert.IsTrue(task.IsCanceled);
+                var task2 = _grid1.GetCompute().RunAsync(new ComputeAction(), cts.Token);
+                Assert.IsTrue(task2.IsCanceled);
             }
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
similarity index 54%
copy from modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
copy to modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
index a3485d0..06e4f66 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Client/Cache/CacheTestAsyncAwait.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Compute/ComputeTestAsyncAwait.cs
@@ -15,29 +15,36 @@
  * limitations under the License.
  */
 
-namespace Apache.Ignite.Core.Tests.Client.Cache
+namespace Apache.Ignite.Core.Tests.Compute
 {
     using System.Threading.Tasks;
-    using Apache.Ignite.Core.Tests.Client;
     using NUnit.Framework;
 
     /// <summary>
-    /// Tests cache operations with async/await.
+    /// Tests compute async continuation behavior.
     /// </summary>
-    public class CacheTestAsyncAwait : ClientTestBase
+    public class ComputeTestAsyncAwait : TestBase
     {
         /// <summary>
-        /// Tests that async continuations are executed on a ThreadPool thread, not on response handler thread.
+        /// Tests that RunAsync continuation does not capture Ignite threads.
         /// </summary>
         [Test]
-        public async Task TestAsyncAwaitContinuationIsExecutedOnThreadPool()
+        public async Task TestComputeRunAsyncContinuation()
         {
-            var cache = GetClientCache<int>();
-            await cache.PutAsync(1, 1).ConfigureAwait(false);
+            await Ignite.GetCompute().RunAsync(new ComputeAction());
 
-            // This causes deadlock if async continuation is executed on response handler thread.
-            cache.PutAsync(2, 2).Wait();
-            Assert.AreEqual(2, cache.Get(2));
+            StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName());
+        }
+
+        /// <summary>
+        /// Tests that ExecuteAsync continuation does not capture Ignite threads.
+        /// </summary>
+        [Test]
+        public async Task TestComputeExecuteAsyncContinuation()
+        {
+            await Ignite.GetCompute().ExecuteAsync(new StringLengthEmptyTask(), "abc");
+
+            StringAssert.StartsWith("Thread-", TestUtilsJni.GetJavaThreadName());
         }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
index bcc8347..f294afa 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/full-config.xml
@@ -24,7 +24,7 @@
               isLateAffinityAssignment='false' springConfigUrl='c:\myconfig.xml' autoGenerateIgniteInstanceName='true'
               peerAssemblyLoadingMode='CurrentAppDomain' longQueryWarningTimeout='1:2:3' isActiveOnStart='false'
               consistentId='someId012' redirectJavaConsoleOutput='false' authenticationEnabled='true' mvccVacuumFrequency='10000' mvccVacuumThreadCount='4'
-              sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true'>
+              sqlQueryHistorySize='123' javaPeerClassLoadingEnabled='true' asyncContinuationExecutor='UnsafeSynchronous'>
     <localhost>127.1.1.1</localhost>
     <binaryConfiguration compactFooter='false' keepDeserialized='true'>
         <nameMapper type='Apache.Ignite.Core.Tests.IgniteConfigurationSerializerTest+NameMapper' bar='testBar' />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
index 4488f93..acd5a9b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/spring-test.xml
@@ -47,5 +47,9 @@
         <property name="dataStorageConfiguration">
             <bean class="org.apache.ignite.configuration.DataStorageConfiguration"/>
         </property>
+
+        <property name="asyncContinuationExecutor">
+            <bean class="org.apache.ignite.platform.PlatformTestExecutor"/>
+        </property>
     </bean>
 </beans>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
index a98cad9..74d2688 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationSerializerTest.cs
@@ -386,6 +386,8 @@ namespace Apache.Ignite.Core.Tests
             Assert.AreEqual(2, ec.Count);
             Assert.AreEqual(new[] {"exec1", "exec2"}, ec.Select(e => e.Name));
             Assert.AreEqual(new[] {1, 2}, ec.Select(e => e.Size));
+
+            Assert.AreEqual(AsyncContinuationExecutor.UnsafeSynchronous, cfg.AsyncContinuationExecutor);
         }
 
         /// <summary>
@@ -1081,7 +1083,8 @@ namespace Apache.Ignite.Core.Tests
                         Name = "exec-1",
                         Size = 11
                     }
-                }
+                },
+                AsyncContinuationExecutor = AsyncContinuationExecutor.ThreadPool
             };
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
index e49b6c2..5d57aa8 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/IgniteConfigurationTest.cs
@@ -315,6 +315,7 @@ namespace Apache.Ignite.Core.Tests
                 CheckDefaultProperties(resCfg.ClientConnectorConfiguration);
 
                 Assert.AreEqual(false, resCfg.JavaPeerClassLoadingEnabled);
+                Assert.AreEqual(AsyncContinuationExecutor.Custom, resCfg.AsyncContinuationExecutor);
             }
         }
 
@@ -555,6 +556,7 @@ namespace Apache.Ignite.Core.Tests
             Assert.AreEqual(IgniteConfiguration.DefaultAuthenticationEnabled, cfg.AuthenticationEnabled);
             Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumFrequency, cfg.MvccVacuumFrequency);
             Assert.AreEqual(IgniteConfiguration.DefaultMvccVacuumThreadCount, cfg.MvccVacuumThreadCount);
+            Assert.AreEqual(AsyncContinuationExecutor.ThreadPool, cfg.AsyncContinuationExecutor);
 
             // Thread pools.
             Assert.AreEqual(IgniteConfiguration.DefaultManagementThreadPoolSize, cfg.ManagementThreadPoolSize);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
index 5fdf70d..2dcc94a 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/ProjectFilesTest.cs
@@ -37,7 +37,7 @@ namespace Apache.Ignite.Core.Tests
         {
             var projFiles = TestUtils.GetDotNetSourceDir()
                 .GetFiles("*.csproj", SearchOption.AllDirectories)
-                .Where(x => !x.FullName.ToLower().Contains("dotnetcore") && 
+                .Where(x => !x.FullName.ToLower().Contains("dotnetcore") &&
                             !x.FullName.Contains("Benchmark") &&
                             !x.FullName.Contains("templates") &&
                             !x.FullName.Contains("examples"))
@@ -95,7 +95,7 @@ namespace Apache.Ignite.Core.Tests
         {
             var excluded = new[]
             {
-                "ProjectFilesTest.cs", 
+                "ProjectFilesTest.cs",
                 "CopyOnWriteConcurrentDictionary.cs",
                 "IgniteArgumentCheck.cs",
                 "DelegateConverter.cs",
@@ -109,7 +109,7 @@ namespace Apache.Ignite.Core.Tests
                 "HandleRegistry.cs",
                 "BinaryObjectHeader.cs"
             };
-            
+
             var csFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.cs", SearchOption.AllDirectories);
 
             foreach (var csFile in csFiles)
@@ -142,10 +142,10 @@ namespace Apache.Ignite.Core.Tests
         public void TestAllCsharpFilesAreIncludedInProject()
         {
             var projFiles = TestUtils.GetDotNetSourceDir().GetFiles("*.csproj", SearchOption.AllDirectories)
-                .Where(x => 
+                .Where(x =>
                     !x.Name.Contains("DotNetCore") &&
-                    !x.Name.Contains("Benchmark") && 
-                    !x.FullName.Contains("templates") && 
+                    !x.Name.Contains("Benchmark") &&
+                    !x.FullName.Contains("templates") &&
                     !x.FullName.Contains("examples"));
 
             var excludedFiles = new[]
@@ -153,7 +153,9 @@ namespace Apache.Ignite.Core.Tests
                 "IgnitionStartTest.cs",
                 "Common\\TestFixtureSetUp.cs",
                 "Common\\TestFixtureTearDown.cs",
-                "Client\\Cache\\CacheTestAsyncAwait.cs"
+                "Client\\Cache\\CacheTestAsyncAwait.cs",
+                "Cache\\CacheTestAsyncAwait.cs",
+                "Compute\\ComputeTestAsyncAwait.cs"
             };
 
             Assert.Multiple(() =>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
index 72be97e..1b6303b 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Services/PlatformTestService.cs
@@ -395,6 +395,7 @@ namespace Apache.Ignite.Core.Tests.Services
         /** <inheritDoc /> */
         public ServicesTest.PlatformComputeBinarizable[] testBinarizableArray(ServicesTest.PlatformComputeBinarizable[] x)
         {
+            // ReSharper disable once CoVariantArrayConversion
             return (ServicesTest.PlatformComputeBinarizable[])testBinarizableArrayOfObjects(x);
         }
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
index ac379a0..d76c9e0 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestUtilsJni.cs
@@ -95,6 +95,15 @@ namespace Apache.Ignite.Core.Tests
             CallVoidMethod(ClassPlatformProcessUtils, "destroyProcess", "()V");
         }
 
+        /// <summary>
+        /// Gets the Java thread name.
+        /// </summary>
+        /// <returns></returns>
+        public static string GetJavaThreadName()
+        {
+            return CallStringMethod(ClassPlatformThreadUtils, "getThreadName", "()Ljava/lang/String;");
+        }
+
         /** */
         private static unsafe void CallStringMethod(string className, string methodName, string methodSig, string arg)
         {
@@ -122,5 +131,17 @@ namespace Apache.Ignite.Core.Tests
                 env.CallStaticVoidMethod(cls, methodId);
             }
         }
+
+        /** */
+        private static unsafe string CallStringMethod(string className, string methodName, string methodSig)
+        {
+            var env = Jvm.Get().AttachCurrentThread();
+            using (var cls = env.FindClass(className))
+            {
+                var methodId = env.GetStaticMethodId(cls, methodName, methodSig);
+                var res = env.CallStaticObjectMethod(cls, methodId);
+                return env.JStringToString(res.Target);
+            }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
index a0eb1e4..b61aad3 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj
@@ -81,6 +81,7 @@
     <Compile Include="Cluster\IBaselineNode.cs" />
     <Compile Include="Common\IgniteIllegalStateException.cs" />
     <Compile Include="Common\IgniteProductVersion.cs" />
+    <Compile Include="Configuration\AsyncContinuationExecutor.cs" />
     <Compile Include="Configuration\CheckpointWriteOrder.cs" />
     <Compile Include="Configuration\DataPageEvictionMode.cs" />
     <Compile Include="Configuration\DiskPageCompression.cs" />
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs
new file mode 100644
index 0000000..36953fb
--- /dev/null
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Configuration/AsyncContinuationExecutor.cs
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace Apache.Ignite.Core.Configuration
+{
+    /// <summary>
+    /// Defines async continuations behavior.
+    /// </summary>
+    public enum AsyncContinuationExecutor
+    {
+        /// <summary>
+        /// Executes async continuations on the thread pool (default).
+        /// </summary>
+        ThreadPool = 0,
+
+        /// <summary>
+        /// Executes async continuations synchronously on the same thread that completes the previous operation.
+        /// <para />
+        /// WARNING: can cause deadlocks and performance issues when not used correctly.
+        /// <para />
+        /// Ignite performs cache operations using a special "striped" thread pool
+        /// (see <see cref="IgniteConfiguration.StripedThreadPoolSize"/>). Using this synchronous mode means that
+        /// async continuations (any code coming after <c>await cache.DoAsync()</c>, or code in <c>ContinueWith()</c>)
+        /// will run on the striped pool:
+        /// <ul>
+        /// <li>
+        /// Cache operations can't execute while user code runs on the striped thread.
+        /// </li>
+        /// <li>
+        /// Attempting other cache operations on the striped thread can cause a deadlock.
+        /// </li>
+        /// </ul>
+        /// <para />
+        /// This mode can improve performance, because continuations do not have to be scheduled on another thread.
+        /// However, special care is required to release striped threads as soon as possible.
+        /// </summary>
+        UnsafeSynchronous = 1,
+
+        /// <summary>
+        /// Indicates that custom executor is configured on the Java side.
+        /// <para />
+        /// This value should not be used explicitly.
+        /// </summary>
+        Custom = 2
+    }
+}
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
index 3ef2b7f..e6437d2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfiguration.cs
@@ -221,6 +221,9 @@ namespace Apache.Ignite.Core
         /** */
         private bool? _clientMode;
 
+        /** */
+        private AsyncContinuationExecutor? _asyncContinuationExecutor;
+
         /// <summary>
         /// Default network retry count.
         /// </summary>
@@ -342,6 +345,7 @@ namespace Apache.Ignite.Core
             writer.WriteTimeSpanAsLongNullable(_sysWorkerBlockedTimeout);
             writer.WriteIntNullable(_sqlQueryHistorySize);
             writer.WriteBooleanNullable(_javaPeerClassLoadingEnabled);
+            writer.WriteIntNullable((int?) _asyncContinuationExecutor);
 
             if (SqlSchemas == null)
                 writer.WriteInt(0);
@@ -747,6 +751,7 @@ namespace Apache.Ignite.Core
             _sysWorkerBlockedTimeout = r.ReadTimeSpanNullable();
             _sqlQueryHistorySize = r.ReadIntNullable();
             _javaPeerClassLoadingEnabled = r.ReadBooleanNullable();
+            _asyncContinuationExecutor = (AsyncContinuationExecutor?) r.ReadIntNullable();
 
             int sqlSchemasCnt = r.ReadInt();
 
@@ -1729,10 +1734,20 @@ namespace Apache.Ignite.Core
         /// and peer class loading in Java are two distinct and independent features.
         /// <para />
         /// </summary>
-        public bool JavaPeerClassLoadingEnabled 
+        public bool JavaPeerClassLoadingEnabled
         {
             get { return _javaPeerClassLoadingEnabled ?? default(bool); }
             set { _javaPeerClassLoadingEnabled = value; }
         }
+
+        /// <summary>
+        /// Gets or sets the async continuation behavior.
+        /// See <see cref="Apache.Ignite.Core.Configuration.AsyncContinuationExecutor"/> members for more details.
+        /// </summary>
+        public AsyncContinuationExecutor AsyncContinuationExecutor
+        {
+            get { return _asyncContinuationExecutor ?? default(AsyncContinuationExecutor); }
+            set { _asyncContinuationExecutor = value; }
+        }
     }
 }
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
index 0b35ca4..950b738 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/IgniteConfigurationSection.xsd
@@ -128,6 +128,13 @@
         </xs:restriction>
     </xs:simpleType>
 
+    <xs:simpleType name="asyncContinuationExecutor" final="restriction">
+        <xs:restriction base="xs:string">
+            <xs:enumeration value="ThreadPool" />
+            <xs:enumeration value="UnsafeSynchronous" />
+        </xs:restriction>
+    </xs:simpleType>
+
     <xs:element name="igniteConfiguration">
         <xs:annotation>
             <xs:documentation>Ignite configuration root.</xs:documentation>
@@ -2513,6 +2520,11 @@
                 <xs:documentation>Whether Java console output should be redirected to Console.Out and Console.Error.</xs:documentation>
               </xs:annotation>
             </xs:attribute>
+            <xs:attribute name="asyncContinuationExecutor" type="asyncContinuationExecutor">
+              <xs:annotation>
+                <xs:documentation>Async continuation behavior.</xs:documentation>
+              </xs:annotation>
+            </xs:attribute>
         </xs:complexType>
     </xs:element>
 </xs:schema>
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
index 489fe44..e2bed73 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Compute/ComputeTaskHolder.cs
@@ -23,6 +23,7 @@ namespace Apache.Ignite.Core.Impl.Compute
     using System.Diagnostics;
     using System.Diagnostics.CodeAnalysis;
     using System.Linq;
+    using System.Threading;
     using Apache.Ignite.Core.Cluster;
     using Apache.Ignite.Core.Common;
     using Apache.Ignite.Core.Compute;
@@ -59,7 +60,7 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// <param name="stream">Stream.</param>
         /// <returns>Policy.</returns>
         int JobResultRemote(ComputeJobHolder jobId, PlatformMemoryStream stream);
-        
+
         /// <summary>
         /// Perform task reduce.
         /// </summary>
@@ -70,7 +71,7 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// </summary>
         /// <param name="taskHandle">Task handle.</param>
         void Complete(long taskHandle);
-        
+
         /// <summary>
         /// Complete task with error.
         /// </summary>
@@ -85,7 +86,7 @@ namespace Apache.Ignite.Core.Impl.Compute
     internal class ComputeTaskHolder<TA, T, TR> : IComputeTaskHolder
     {
         /** Empty results. */
-        private static readonly IList<IComputeJobResult<T>> EmptyRes =     
+        private static readonly IList<IComputeJobResult<T>> EmptyRes =
             new ReadOnlyCollection<IComputeJobResult<T>>(new List<IComputeJobResult<T>>());
 
         /** Compute instance. */
@@ -102,7 +103,7 @@ namespace Apache.Ignite.Core.Impl.Compute
 
         /** Task future. */
         private readonly Future<TR> _fut = new Future<TR>();
-                
+
         /** Jobs whose results are cached. */
         private ISet<object> _resJobs;
 
@@ -111,7 +112,7 @@ namespace Apache.Ignite.Core.Impl.Compute
 
         /** Handles for jobs which are not serialized right away. */
         private volatile List<long> _jobHandles;
-        
+
         /// <summary>
         /// Constructor.
         /// </summary>
@@ -241,7 +242,7 @@ namespace Apache.Ignite.Core.Impl.Compute
                 Finish(default(TR), e);
 
                 stream.Reset();
-                
+
                 writer.WriteBoolean(false); // Map failed.
                 writer.WriteString(e.Message); // Write error message.
             }
@@ -345,7 +346,7 @@ namespace Apache.Ignite.Core.Impl.Compute
                 throw;
             }
         }
-        
+
         /** <inheritDoc /> */
         public void Reduce()
         {
@@ -489,7 +490,8 @@ namespace Apache.Ignite.Core.Impl.Compute
         /// <param name="err">Error.</param>
         private void Finish(TR res, Exception err)
         {
-            _fut.OnDone(res, err);
+            // Always complete the future on a ThreadPool thread to avoid capturing Ignite "pub-" thread.
+            ThreadPool.QueueUserWorkItem(_ => _fut.OnDone(res, err));
         }
 
         /// <summary>
@@ -503,7 +505,7 @@ namespace Apache.Ignite.Core.Impl.Compute
             var handleRegistry = _compute.Marshaller.Ignite.HandleRegistry;
 
             if (handles != null)
-                foreach (var handle in handles) 
+                foreach (var handle in handles)
                     handleRegistry.Release(handle, true);
 
             handleRegistry.Release(taskHandle, true);
diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
index 4bada09..86515c2 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/Jni/Env.cs
@@ -230,7 +230,7 @@ namespace Apache.Ignite.Core.Impl.Unmanaged.Jni
         /// <summary>
         /// Calls the static object method.
         /// </summary>
-        private GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null)
+        public GlobalRef CallStaticObjectMethod(GlobalRef cls, IntPtr methodId, long* argsPtr = null)
         {
             var res = _callStaticObjectMethod(_envPtr, cls.Target, methodId, argsPtr);