You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:17 UTC

[37/50] [abbrv] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NI

IGNITE-3220 I/O bottleneck on server/client cluster configuration
Communications optimizations:
- possibility to open separate in/out connections
- possibility to have multiple connections between nodes
- implemented NIO sessions balancing between NIO threads
- reduced amount of work and blocking calls in NIO threads
Other:
- implemented StripedExecutor for cache messages handling
- added 'io test' messages for IO performance testing

(cherry picked from commit 10ade28)


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05dd08b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05dd08b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05dd08b9

Branch: refs/heads/master
Commit: 05dd08b993e2d7f88176c051463b178431714f85
Parents: 57eb47f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 9 12:28:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 16 14:01:53 2016 +0300

----------------------------------------------------------------------
 .../ignite/examples/ExampleNodeStartup.java     |    2 +-
 .../examples/datagrid/CachePutGetExample.java   |    2 +-
 .../scalar/examples/ScalarJvmCloudExample.scala |    2 +-
 .../rest/ClientMemcachedProtocolSelfTest.java   |    4 +-
 .../rest/protocols/tcp/MockNioSession.java      |   25 +-
 .../apache/ignite/IgniteSystemProperties.java   |    3 +
 .../cache/store/CacheLoadOnlyStoreAdapter.java  |    6 +-
 .../configuration/IgniteConfiguration.java      |   50 +-
 .../internal/GridEventConsumeHandler.java       |    2 +-
 .../ignite/internal/GridJobContextImpl.java     |    4 +-
 .../ignite/internal/GridKernalContext.java      |    9 +
 .../ignite/internal/GridKernalContextImpl.java  |   16 +-
 .../internal/GridPerformanceSuggestions.java    |    2 +-
 .../org/apache/ignite/internal/GridTopic.java   |    5 +-
 .../ignite/internal/IgniteInternalFuture.java   |   11 +
 .../apache/ignite/internal/IgniteKernal.java    |   85 +-
 .../org/apache/ignite/internal/IgnitionEx.java  |   32 +-
 .../GridClientConnectionManagerAdapter.java     |    1 +
 .../client/router/impl/GridTcpRouterImpl.java   |    1 +
 .../managers/communication/GridIoManager.java   |  207 ++-
 .../managers/communication/GridIoMessage.java   |   13 +
 .../communication/GridIoMessageFactory.java     |   12 +-
 .../communication/IgniteIoTestMessage.java      |  235 +++
 .../processors/cache/GridCacheAdapter.java      |   26 +-
 .../processors/cache/GridCacheMessage.java      |    7 +
 .../processors/cache/GridCacheUtils.java        |   35 +
 .../processors/cache/IgniteCacheProxy.java      |    8 +
 .../distributed/GridDistributedLockRequest.java |    5 +
 .../GridDistributedTxFinishResponse.java        |    6 +
 .../GridDistributedUnlockRequest.java           |    5 +
 .../distributed/dht/GridDhtCacheAdapter.java    |    3 +-
 .../distributed/dht/GridDhtLockResponse.java    |    9 +-
 .../dht/atomic/GridDhtAtomicCache.java          |    5 +-
 .../GridDhtAtomicSingleUpdateRequest.java       |    5 +
 .../dht/atomic/GridDhtAtomicUpdateRequest.java  |    5 +
 .../atomic/GridNearAtomicFullUpdateRequest.java |    5 +
 .../GridNearAtomicSingleUpdateRequest.java      |    5 +
 .../distributed/near/GridNearGetRequest.java    |    5 +
 .../local/atomic/GridLocalAtomicCache.java      |    3 +
 .../query/GridCacheDistributedQueryManager.java |    2 +-
 .../cache/query/GridCacheQueryRequest.java      |    6 +-
 .../transactions/IgniteTxLocalAdapter.java      |    8 +-
 .../datastreamer/DataStreamProcessor.java       |   22 +-
 .../internal/processors/igfs/IgfsContext.java   |    4 +-
 .../processors/igfs/IgfsDataManager.java        |    6 +-
 .../internal/processors/igfs/IgfsImpl.java      |    2 +-
 .../internal/processors/odbc/OdbcProcessor.java |    1 +
 .../platform/compute/PlatformCompute.java       |    6 +
 .../tcp/GridTcpMemcachedNioListener.java        |   15 +-
 .../protocols/tcp/GridTcpRestNioListener.java   |    2 +-
 .../rest/protocols/tcp/GridTcpRestProtocol.java |    1 +
 .../service/GridServiceProcessor.java           |    6 +-
 .../ignite/internal/util/IgniteUtils.java       |   62 +-
 .../ignite/internal/util/StripedExecutor.java   |  667 +++++++++
 .../util/future/GridFinishedFuture.java         |   24 +
 .../internal/util/future/GridFutureAdapter.java |   15 +-
 .../util/future/GridFutureChainListener.java    |   30 +-
 .../internal/util/ipc/IpcToNioAdapter.java      |    2 +-
 .../nio/GridAbstractCommunicationClient.java    |   12 +-
 .../util/nio/GridCommunicationClient.java       |    9 +-
 .../nio/GridConnectionBytesVerifyFilter.java    |   15 +-
 .../util/nio/GridNioAsyncNotifyFilter.java      |   10 +-
 .../internal/util/nio/GridNioCodecFilter.java   |   17 +-
 .../ignite/internal/util/nio/GridNioFilter.java |   16 +-
 .../internal/util/nio/GridNioFilterAdapter.java |   10 +-
 .../internal/util/nio/GridNioFilterChain.java   |   14 +-
 .../ignite/internal/util/nio/GridNioFuture.java |    4 +-
 .../util/nio/GridNioRecoveryDescriptor.java     |  124 +-
 .../ignite/internal/util/nio/GridNioServer.java | 1404 +++++++++++++++---
 .../internal/util/nio/GridNioSession.java       |   25 +-
 .../internal/util/nio/GridNioSessionImpl.java   |   65 +-
 .../ignite/internal/util/nio/GridNioWorker.java |   48 +
 .../util/nio/GridSelectorNioSessionImpl.java    |  221 ++-
 .../util/nio/GridShmemCommunicationClient.java  |    7 +-
 .../util/nio/GridTcpNioCommunicationClient.java |   55 +-
 .../internal/util/nio/SessionWriteRequest.java  |   85 ++
 .../internal/util/nio/ssl/GridNioSslFilter.java |   10 +-
 .../util/nio/ssl/GridNioSslHandler.java         |    4 +-
 .../util/tostring/GridToStringBuilder.java      |    2 +-
 .../communication/tcp/TcpCommunicationSpi.java  | 1340 ++++++++++++-----
 .../tcp/TcpCommunicationSpiMBean.java           |   40 +
 .../ignite/spi/discovery/tcp/ServerImpl.java    |   14 +-
 .../ignite/stream/socket/SocketStreamer.java    |    1 +
 .../ignite/thread/IgniteThreadFactory.java      |    8 +-
 .../IgniteSlowClientDetectionSelfTest.java      |    1 +
 ...unicationBalanceMultipleConnectionsTest.java |   28 +
 .../IgniteCommunicationBalanceTest.java         |  339 +++++
 .../communication/IgniteIoTestMessagesTest.java |   95 ++
 .../IgniteVariousConnectionNumberTest.java      |  166 +++
 .../cache/CrossCacheTxRandomOperationsTest.java |   30 +-
 ...idAbstractCacheInterceptorRebalanceTest.java |    4 +-
 ...CacheOffHeapMultiThreadedUpdateSelfTest.java |    6 +-
 ...eAtomicMessageRecovery10ConnectionsTest.java |   28 +
 ...cMessageRecoveryNoPairedConnectionsTest.java |   47 +
 ...acheConnectionRecovery10ConnectionsTest.java |   35 +
 .../distributed/IgniteCacheCreatePutTest.java   |    2 +-
 .../IgniteCacheMessageRecoveryAbstractTest.java |   24 +-
 .../IgniteCacheMessageWriteTimeoutTest.java     |   17 +-
 .../dht/IgniteCacheMultiTxLockSelfTest.java     |    6 +-
 ...erNoStripedPoolMultiNodeFullApiSelfTest.java |   35 +
 ...edNoStripedPoolMultiNodeFullApiSelfTest.java |   35 +
 .../TxDeadlockDetectionNoHangsTest.java         |    2 +-
 .../TxOptimisticDeadlockDetectionTest.java      |   29 +-
 .../GridServiceProcessorProxySelfTest.java      |    2 +-
 .../util/future/GridFutureAdapterSelfTest.java  |  122 +-
 .../nio/impl/GridNioFilterChainSelfTest.java    |   32 +-
 .../loadtests/nio/GridNioBenchmarkClient.java   |    4 +-
 .../p2p/GridP2PRecursionTaskSelfTest.java       |    2 +-
 .../spi/GridTcpSpiForwardingSelfTest.java       |   18 +-
 .../GridTcpCommunicationSpiAbstractTest.java    |   28 +-
 ...mmunicationSpiConcurrentConnectSelfTest.java |   82 +-
 .../GridTcpCommunicationSpiConfigSelfTest.java  |    5 +-
 ...cpCommunicationSpiMultithreadedSelfTest.java |   23 +-
 ...dTcpCommunicationSpiRecoveryAckSelfTest.java |    9 +-
 ...tionSpiRecoveryFailureDetectionSelfTest.java |    1 +
 ...ationSpiRecoveryNoPairedConnectionsTest.java |   28 +
 ...GridTcpCommunicationSpiRecoverySelfTest.java |   67 +-
 ...CommunicationRecoveryAckClosureSelfTest.java |    9 +-
 .../junits/GridTestKernalContext.java           |    4 +-
 .../IgniteCacheFullApiSelfTestSuite.java        |    6 +
 .../ignite/testsuites/IgniteCacheTestSuite.java |   17 +-
 .../IgniteSpiCommunicationSelfTestSuite.java    |    2 +
 .../hadoop/jobtracker/HadoopJobTracker.java     |    4 +-
 .../HadoopExternalCommunication.java            |    5 +-
 .../communication/HadoopIpcToNioAdapter.java    |    2 +-
 .../communication/HadoopMarshallerFilter.java   |    6 +-
 .../ignite/stream/kafka/KafkaStreamer.java      |    2 +-
 .../ignite/tools/classgen/ClassesGenerator.java |    8 +-
 .../ignite/yardstick/IgniteBenchmarkUtils.java  |    6 +-
 .../yardstick/cache/CacheEntryEventProbe.java   |    2 +-
 .../yardstick/cache/IgniteIoTestBenchmark.java  |   73 +
 .../io/IgniteIoTestAbstractBenchmark.java       |   61 +
 .../io/IgniteIoTestSendAllBenchmark.java        |   32 +
 .../io/IgniteIoTestSendRandomBenchmark.java     |   35 +
 134 files changed, 5935 insertions(+), 998 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
index ad12297..dd8a72b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
@@ -33,4 +33,4 @@ public class ExampleNodeStartup {
     public static void main(String[] args) throws IgniteException {
         Ignition.start("examples/config/example-ignite.xml");
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
index 82a76b8..b9bae5b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
@@ -105,4 +105,4 @@ public class CachePutGetExample {
         for (Map.Entry<Integer, String> e : vals.entrySet())
             System.out.println("Got entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
index 1014726..814bb2e 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
@@ -50,7 +50,7 @@ object ScalarJvmCloudExample {
             val pool = Executors.newFixedThreadPool(NODES.size)
 
             // Concurrently startup all nodes.
-            NODES.foreach(name => pool.submit(new Runnable {
+            NODES.foreach(name => pool.execute(new Runnable {
                 @impl def run() {
                     // All defaults.
                     val cfg = new IgniteConfiguration

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
index 0f56c73..c03c06e 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
@@ -111,6 +111,8 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe
 
         Map<String, Object> map = client.getBulk("getKey1", "getKey2");
 
+        info("Map: " + map);
+
         Assert.assertEquals(2, map.size());
 
         Assert.assertEquals("getVal1", map.get("getKey1"));
@@ -443,4 +445,4 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe
             return res;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index c82c73e..9bc4e7f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.rest.protocols.tcp;
 
 import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
 import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
 import org.apache.ignite.internal.util.nio.GridNioFuture;
@@ -111,6 +112,11 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
     }
 
     /** {@inheritDoc} */
+    @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
     @Override public GridNioFuture<Object> resumeReads() {
         return null;
     }
@@ -131,12 +137,27 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         // No-op.
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+    @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
         return null;
     }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void systemMessage(Object msg) {
+        // No-op.
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index d0c0d5e..9650a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -477,6 +477,9 @@ public final class IgniteSystemProperties {
     @Deprecated
     public static final String IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES = "IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES";
 
+    /** */
+    public static final String IGNITE_IO_BALANCE_PERIOD = "IGNITE_IO_BALANCE_PERIOD";
+
     /**
      * When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise
      * the natural order is used.

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
index 7494e37..d3f381e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
@@ -153,14 +153,14 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K
                 buf.add(iter.next());
 
                 if (buf.size() == batchSize) {
-                    exec.submit(new Worker(c, buf, args));
+                    exec.execute(new Worker(c, buf, args));
 
                     buf = new ArrayList<>(batchSize);
                 }
             }
 
             if (!buf.isEmpty())
-                exec.submit(new Worker(c, buf, args));
+                exec.execute(new Worker(c, buf, args));
         }
         catch (RejectedExecutionException ignored) {
             // Because of custom RejectedExecutionHandler.
@@ -330,4 +330,4 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K
             }
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 75145a3..dcd8a80 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -146,7 +146,7 @@ public class IgniteConfiguration {
     public static final int AVAILABLE_PROC_CNT = Runtime.getRuntime().availableProcessors();
 
     /** Default core size of public thread pool. */
-    public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2;
+    public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
 
     /** Default keep alive time for public thread pool. */
     @Deprecated
@@ -236,6 +236,12 @@ public class IgniteConfiguration {
     /** Async Callback pool size. */
     private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
 
+    /**
+     * Use striped pool for internal requests processing when possible
+     * (e.g. cache requests per-partition striping).
+     */
+    private int stripedPoolSize = DFLT_PUBLIC_THREAD_CNT;
+
     /** System pool size. */
     private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
 
@@ -553,6 +559,7 @@ public class IgniteConfiguration {
         sndRetryDelay = cfg.getNetworkSendRetryDelay();
         sslCtxFactory = cfg.getSslContextFactory();
         storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
+        stripedPoolSize = cfg.getStripedPoolSize();
         svcCfgs = cfg.getServiceConfiguration();
         sysPoolSize = cfg.getSystemThreadPoolSize();
         timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -712,6 +719,47 @@ public class IgniteConfiguration {
     }
 
     /**
+     * Returns striped pool size that should be used for cache requests
+     * processing.
+     * <p>
+     * If set to non-positive value then requests get processed in system pool.
+     * <p>
+     * Striped pool is better for typical cache operations.
+     *
+     * @return Positive value if striped pool should be initialized
+     *      with configured number of threads (stripes) and used for requests processing
+     *      or non-positive value to process requests in system pool.
+     *
+     * @see #getPublicThreadPoolSize()
+     * @see #getSystemThreadPoolSize()
+     */
+    public int getStripedPoolSize() {
+        return stripedPoolSize;
+    }
+
+    /**
+     * Sets striped pool size that should be used for cache requests
+     * processing.
+     * <p>
+     * If set to non-positive value then requests get processed in system pool.
+     * <p>
+     * Striped pool is better for typical cache operations.
+     *
+     * @param stripedPoolSize Positive value if striped pool should be initialized
+     *      with passed in number of threads (stripes) and used for requests processing
+     *      or non-positive value to process requests in system pool.
+     * @return {@code this} for chaining.
+     *
+     * @see #getPublicThreadPoolSize()
+     * @see #getSystemThreadPoolSize()
+     */
+    public IgniteConfiguration setStripedPoolSize(int stripedPoolSize) {
+        this.stripedPoolSize = stripedPoolSize;
+
+        return this;
+    }
+
+    /**
      * Should return a thread pool size to be used in grid.
      * This executor service will be in charge of processing {@link ComputeJob GridJobs}
      * and user messages sent to node.

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 715f8a5..68d34ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -181,7 +181,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
                         notificationQueue.add(new T3<>(nodeId, routineId, evt));
 
                         if (!notificationInProgress) {
-                            ctx.getSystemExecutorService().submit(new Runnable() {
+                            ctx.getSystemExecutorService().execute(new Runnable() {
                                 @Override public void run() {
                                     if (!ctx.continuous().lockStopping())
                                         return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
index 804d228..dbfa0b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
@@ -217,7 +217,7 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable {
 
                                 assert execSvc != null;
 
-                                execSvc.submit(new Runnable() {
+                                execSvc.execute(new Runnable() {
                                     @Override public void run() {
                                         callcc0();
                                     }
@@ -300,4 +300,4 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable {
     @Override public String toString() {
         return S.toString(GridJobContextImpl.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ae29223..927944f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.plugin.PluginNotFoundException;
 import org.apache.ignite.plugin.PluginProvider;
@@ -511,6 +512,14 @@ public interface GridKernalContext extends Iterable<GridComponent> {
     public ExecutorService getSystemExecutorService();
 
     /**
+     * Executor service that is in charge of processing internal system messages
+     * in stripes (dedicated threads).
+     *
+     * @return Thread pool implementation to be used in grid for internal system messages.
+     */
+    public StripedExecutor getStripedExecutorService();
+
+    /**
      * Executor service that is in charge of processing internal and Visor
      * {@link org.apache.ignite.compute.ComputeJob GridJobs}.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 94c6448..a2ad1b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
 import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -300,6 +301,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
+    protected StripedExecutor stripedExecSvc;
+
+    /** */
+    @GridToStringExclude
     private ExecutorService p2pExecSvc;
 
     /** */
@@ -381,6 +386,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
      * @param marshCachePool Marshaller cache pool.
      * @param execSvc Public executor service.
      * @param sysExecSvc System executor service.
+     * @param stripedExecSvc Striped executor.
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
@@ -400,6 +406,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService marshCachePool,
         ExecutorService execSvc,
         ExecutorService sysExecSvc,
+        StripedExecutor stripedExecSvc,
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
@@ -407,7 +414,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
-        List<PluginProvider> plugins) throws IgniteCheckedException {
+        List<PluginProvider> plugins
+    ) throws IgniteCheckedException {
         assert grid != null;
         assert cfg != null;
         assert gw != null;
@@ -419,6 +427,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
         this.marshCachePool = marshCachePool;
         this.execSvc = execSvc;
         this.sysExecSvc = sysExecSvc;
+        this.stripedExecSvc = stripedExecSvc;
         this.p2pExecSvc = p2pExecSvc;
         this.mgmtExecSvc = mgmtExecSvc;
         this.igfsExecSvc = igfsExecSvc;
@@ -948,6 +957,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
+    @Override public StripedExecutor getStripedExecutorService() {
+        return stripedExecSvc;
+    }
+
+    /** {@inheritDoc} */
     @Override public ExecutorService getManagementExecutorService() {
         return mgmtExecSvc;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
index b040a97..5e8e520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
@@ -89,4 +89,4 @@ public class GridPerformanceSuggestions {
     @Override public String toString() {
         return S.toString(GridPerformanceSuggestions.class, this);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index b5608db..24ddcd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -97,7 +97,10 @@ public enum GridTopic {
     TOPIC_QUERY,
 
     /** */
-    TOPIC_TX;
+    TOPIC_TX,
+
+    /** */
+    TOPIC_IO_TEST;
 
     /** Enum values. */
     private static final GridTopic[] VALS = values();

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index b80a755..789556d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal;
 
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.IgniteCheckedException;
@@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> {
     public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb);
 
     /**
+     * Make a chained future to convert result of this future (when complete) into a new format.
+     * It is guaranteed that done callback will be called only ONCE.
+     *
+     * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
+     * @param exec Executor to run callback.
+     * @return Chained future that finishes after this future completes and done callback is called.
+     */
+    public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec);
+
+    /**
      * @return Error value if future has already been completed with error.
      */
     public Throwable error();

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 57aab00..7935e06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -61,10 +61,10 @@ import org.apache.ignite.IgniteDataStreamer;
 import org.apache.ignite.IgniteEvents;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.IgniteMessaging;
 import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.IgniteLock;
 import org.apache.ignite.IgniteScheduler;
 import org.apache.ignite.IgniteSemaphore;
 import org.apache.ignite.IgniteServices;
@@ -115,7 +115,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
 import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
 import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
 import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
 import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
 import org.apache.ignite.internal.processors.job.GridJobProcessor;
 import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
@@ -139,6 +138,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor;
 import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
 import org.apache.ignite.internal.processors.task.GridTaskProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.future.GridCompoundFuture;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -175,6 +175,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
 import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
@@ -182,7 +183,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALL
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.IgniteSystemProperties.snapshot;
 import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
@@ -199,7 +199,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -208,11 +207,12 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JMX_PORT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_ARGS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_PID;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LANG_RUNTIME;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
 import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
@@ -663,6 +663,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @param utilityCachePool Utility cache pool.
      * @param execSvc Executor service.
      * @param sysExecSvc System executor service.
+     * @param stripedExecSvc Striped executor.
      * @param p2pExecSvc P2P executor service.
      * @param mgmtExecSvc Management executor service.
      * @param igfsExecSvc IGFS executor service.
@@ -673,11 +674,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
     @SuppressWarnings({"CatchGenericClass", "unchecked"})
-    public void start(final IgniteConfiguration cfg,
+    public void start(
+        final IgniteConfiguration cfg,
         ExecutorService utilityCachePool,
         ExecutorService marshCachePool,
         final ExecutorService execSvc,
         final ExecutorService sysExecSvc,
+        final StripedExecutor stripedExecSvc,
         ExecutorService p2pExecSvc,
         ExecutorService mgmtExecSvc,
         ExecutorService igfsExecSvc,
@@ -685,7 +688,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         ExecutorService affExecSvc,
         @Nullable ExecutorService idxExecSvc,
         IgniteStripedThreadPoolExecutor callbackExecSvc,
-        GridAbsClosure errHnd)
+        GridAbsClosure errHnd
+    )
         throws IgniteCheckedException
     {
         gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName()));
@@ -785,6 +789,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 marshCachePool,
                 execSvc,
                 sysExecSvc,
+                stripedExecSvc,
                 p2pExecSvc,
                 mgmtExecSvc,
                 igfsExecSvc,
@@ -792,7 +797,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 affExecSvc,
                 idxExecSvc,
                 callbackExecSvc,
-                plugins);
+                plugins
+            );
 
             cfg.getMarshaller().setContext(ctx.marshallerContext());
 
@@ -986,24 +992,51 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
 
             starveTask = ctx.timeout().schedule(new Runnable() {
                 /** Last completed task count. */
-                private long lastCompletedCnt;
+                private long lastCompletedCntPub;
+
+                /** Last completed task count. */
+                private long lastCompletedCntSys;
 
                 @Override public void run() {
-                    if (!(execSvc instanceof ThreadPoolExecutor))
-                        return;
+                    if (execSvc instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+
+                        lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub, "public");
+                    }
 
-                    ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+                    if (sysExecSvc instanceof ThreadPoolExecutor) {
+                        ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
 
+                        lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
+                    }
+
+                    if (stripedExecSvc != null)
+                        stripedExecSvc.checkStarvation();
+                }
+
+                /**
+                 * @param exec Thread pool executor to check.
+                 * @param lastCompletedCnt Last completed tasks count.
+                 * @param pool Pool name for message.
+                 * @return Current completed tasks count.
+                 */
+                private long checkPoolStarvation(
+                    ThreadPoolExecutor exec,
+                    long lastCompletedCnt,
+                    String pool
+                ) {
                     long completedCnt = exec.getCompletedTaskCount();
 
                     // If all threads are active and no task has completed since last time and there is
                     // at least one waiting request, then it is possible starvation.
                     if (exec.getPoolSize() == exec.getActiveCount() && completedCnt == lastCompletedCnt &&
                         !exec.getQueue().isEmpty())
-                        LT.warn(log, "Possible thread pool starvation detected (no task completed in last " +
-                            interval + "ms, is executorService pool size large enough?)");
+                        LT.warn(
+                            log,
+                            "Possible thread pool starvation detected (no task completed in last " +
+                                interval + "ms, is " + pool + " thread pool size large enough?)");
 
-                    lastCompletedCnt = completedCnt;
+                    return completedCnt;
                 }
             }, interval, interval);
         }
@@ -1128,6 +1161,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
             }, longOpDumpTimeout, longOpDumpTimeout);
         }
 
+        ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled());
+
         ctx.performance().logSuggestions(log, gridName);
 
         U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}");
@@ -3509,6 +3544,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
         }
     }
 
+    /**
+     * @param node Node.
+     * @param payload Message payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+        return ctx.io().sendIoTest(node, payload, procFromNioThread);
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param payload Message payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+        return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgniteKernal.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index b3a9eec..f32a753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.StripedExecutor;
 import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
 import org.apache.ignite.internal.util.typedef.CA;
 import org.apache.ignite.internal.util.typedef.F;
@@ -1459,6 +1460,9 @@ public class IgnitionEx {
         /** System executor service. */
         private ThreadPoolExecutor sysExecSvc;
 
+        /** */
+        private StripedExecutor stripedExecSvc;
+
         /** Management executor service. */
         private ThreadPoolExecutor mgmtExecSvc;
 
@@ -1652,8 +1656,6 @@ public class IgnitionEx {
 
             execSvc.allowCoreThreadTimeOut(true);
 
-            // Note that since we use 'LinkedBlockingQueue', number of
-            // maximum threads has no effect.
             validateThreadPoolSize(cfg.getSystemThreadPoolSize(), "system");
 
             sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1666,6 +1668,9 @@ public class IgnitionEx {
 
             sysExecSvc.allowCoreThreadTimeOut(true);
 
+            if (cfg.getStripedPoolSize() > 0)
+                stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getGridName(), "sys", log);
+
             // Note that since we use 'LinkedBlockingQueue', number of
             // maximum threads has no effect.
             // Note, that we do not pre-start threads here as management pool may
@@ -1791,13 +1796,26 @@ public class IgnitionEx {
                 // Init here to make grid available to lifecycle listeners.
                 grid = grid0;
 
-                grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
-                    igfsExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc,
+                grid0.start(
+                    myCfg,
+                    utilityCacheExecSvc,
+                    marshCacheExecSvc,
+                    execSvc,
+                    sysExecSvc,
+                    stripedExecSvc,
+                    p2pExecSvc,
+                    mgmtExecSvc,
+                    igfsExecSvc,
+                    restExecSvc,
+                    affExecSvc,
+                    idxExecSvc,
+                    callbackExecSvc,
                     new CA() {
                         @Override public void apply() {
                             startLatch.countDown();
                         }
-                    });
+                    }
+                );
 
                 state = STARTED;
 
@@ -2415,6 +2433,10 @@ public class IgnitionEx {
 
             sysExecSvc = null;
 
+            U.shutdownNow(getClass(), stripedExecSvc, log);
+
+            stripedExecSvc = null;
+
             U.shutdownNow(getClass(), mgmtExecSvc, log);
 
             mgmtExecSvc = null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 6ea7c22..12baee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -200,6 +200,7 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
                     .socketSendBufferSize(0)
                     .idleTimeout(Long.MAX_VALUE)
                     .gridName(routerClient ? "routerClient" : "gridClient")
+                    .serverName("tcp-client")
                     .daemon(cfg.isDaemon())
                     .build();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
index 06a4929..3566830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
@@ -258,6 +258,7 @@ public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean, Lif
                 .logger(log)
                 .selectorCount(Runtime.getRuntime().availableProcessors())
                 .gridName(gridName)
+                .serverName("router")
                 .tcpNoDelay(tcpNoDelay)
                 .directBuffer(false)
                 .byteOrder(ByteOrder.nativeOrder())

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3df29cf..7ef7bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -26,15 +26,17 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Queue;
 import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedDeque;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
@@ -44,6 +46,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteComponentType;
 import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -55,6 +58,8 @@ import org.apache.ignite.internal.processors.pool.PoolProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
 import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridTuple3;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.F;
@@ -83,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
+import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
@@ -176,6 +182,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** Stopping flag. */
     private boolean stopping;
 
+    /** */
+    private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
+
+    /** */
+    private final AtomicLong ioTestId = new AtomicLong();
+
     /**
      * @param ctx Grid kernal context.
      */
@@ -297,6 +309,114 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
         if (log.isDebugEnabled())
             log.debug(startInfo());
+
+        addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
+            @Override public void onMessage(UUID nodeId, Object msg) {
+                ClusterNode node = ctx.discovery().node(nodeId);
+
+                if (node == null)
+                    return;
+
+                IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
+
+                if (msg0.request()) {
+                    IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
+
+                    res.flags(msg0.flags());
+
+                    try {
+                        send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
+                    }
+                }
+                else {
+                    IoTestFuture fut = ioTestMap().get(msg0.id());
+
+                    if (fut == null)
+                        U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
+                    else
+                        fut.onResponse();
+                }
+            }
+        });
+    }
+
+    /**
+     * @param nodes Nodes.
+     * @param payload Payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+        long id = ioTestId.getAndIncrement();
+
+        IoTestFuture fut = new IoTestFuture(id, nodes.size());
+
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+        msg.processFromNioThread(procFromNioThread);
+
+        ioTestMap().put(id, fut);
+
+        for (int i = 0; i < nodes.size(); i++) {
+            ClusterNode node = nodes.get(i);
+
+            try {
+                send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+            }
+            catch (IgniteCheckedException e) {
+                ioTestMap().remove(msg.id());
+
+                return new GridFinishedFuture(e);
+            }
+        }
+
+        return fut;
+    }
+
+    /**
+     * @param node Node.
+     * @param payload Payload.
+     * @param procFromNioThread If {@code true} message is processed from NIO thread.
+     * @return Response future.
+     */
+    public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+        long id = ioTestId.getAndIncrement();
+
+        IoTestFuture fut = new IoTestFuture(id, 1);
+
+        IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+        msg.processFromNioThread(procFromNioThread);
+
+        ioTestMap().put(id, fut);
+
+        try {
+            send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+        }
+        catch (IgniteCheckedException e) {
+            ioTestMap().remove(msg.id());
+
+            return new GridFinishedFuture(e);
+        }
+
+        return fut;
+    }
+
+    /**
+     * @return IO test futures map.
+     */
+    private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
+        ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get();
+
+        if (map == null) {
+            if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+                map = ioTestMap.get();
+        }
+
+        return map;
     }
 
     /** {@inheritDoc} */
@@ -514,16 +634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                 return;
             }
 
-            // Check discovery.
-            ClusterNode node = ctx.discovery().node(nodeId);
-
-            if (node == null) {
-                if (log.isDebugEnabled())
-                    log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']');
-
-                return; // We can't receive messages from non-discovered ones.
-            }
-
             if (msg.topic() == null) {
                 int topicOrd = msg.topicOrdinal();
 
@@ -678,8 +788,31 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
                     msgC.run();
                 }
             }
+
+            @Override public String toString() {
+                return "Message closure [msg=" + msg + ']';
+            }
         };
 
+        if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
+            IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
+
+            if (msg0.processFromNioThread()) {
+                c.run();
+
+                return;
+            }
+        }
+
+        if (ctx.config().getStripedPoolSize() > 0 &&
+            plc == GridIoPolicy.SYSTEM_POOL &&
+            msg.partition() != Integer.MIN_VALUE
+            ) {
+            ctx.getStripedExecutorService().execute(msg.partition(), c);
+
+            return;
+        }
+
         try {
             pools.poolForPolicy(plc).execute(c);
         }
@@ -2460,4 +2593,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             return S.toString(DelayedMessage.class, this, super.toString());
         }
     }
+
+    /**
+     *
+     */
+    private class IoTestFuture extends GridFutureAdapter<Object> {
+        /** */
+        private final long id;
+
+        /** */
+        private int cntr;
+
+        /**
+         * @param id ID.
+         * @param cntr Counter.
+         */
+        IoTestFuture(long id, int cntr) {
+            assert cntr > 0 : cntr;
+
+            this.id = id;
+            this.cntr = cntr;
+        }
+
+        /**
+         *
+         */
+        void onResponse() {
+            boolean complete;
+
+            synchronized (this) {
+                complete = --cntr == 0;
+            }
+
+            if (complete)
+                onDone();
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+            if (super.onDone(res, err)) {
+                ioTestMap().remove(id);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(IoTestFuture.class, this);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b28ced2..b1a26e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.communication;
 import java.io.Externalizable;
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -321,6 +322,18 @@ public class GridIoMessage implements Message {
         return 7;
     }
 
+    /**
+     * Get single partition for this message (if applicable).
+     *
+     * @return Partition ID.
+     */
+    public int partition() {
+        if (msg instanceof GridCacheMessage)
+            return ((GridCacheMessage)msg).partition();
+        else
+            return Integer.MIN_VALUE;
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridIoMessage.class, this);

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 504e683..b1fe910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -171,6 +171,16 @@ public class GridIoMessageFactory implements MessageFactory {
         Message msg = null;
 
         switch (type) {
+            case -44:
+                msg = new TcpCommunicationSpi.HandshakeMessage2();
+
+                break;
+
+            case -43:
+                msg = new IgniteIoTestMessage();
+
+                break;
+
             case -42:
                 msg = new HadoopDirectShuffleMessage();
 
@@ -816,7 +826,7 @@ public class GridIoMessageFactory implements MessageFactory {
 
                 break;
 
-            // [-3..119] [124..127] [-36]- this
+            // [-3..119] [124..127] [-36..-44]- this
             // [120..123] - DR
             // [-4..-22, -30..-35] - SQL
             default:

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
new file mode 100644
index 0000000..77aaa09
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class IgniteIoTestMessage implements Message {
+    /** */
+    private static byte FLAG_PROC_FROM_NIO = 1;
+
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private long id;
+
+    /** */
+    private byte flags;
+
+    /** */
+    private boolean req;
+
+    /** */
+    private byte payload[];
+
+    /**
+     *
+     */
+    IgniteIoTestMessage() {
+        // No-op.
+    }
+
+    /**
+     * @param id Message ID.
+     * @param req Request flag.
+     * @param payload Payload.
+     */
+    IgniteIoTestMessage(long id, boolean req, byte[] payload) {
+        this.id = id;
+        this.req = req;
+        this.payload = payload;
+    }
+
+    /**
+     * @return {@code True} if message should be processed from NIO thread
+     * (otherwise message is submitted to system pool).
+     */
+    boolean processFromNioThread() {
+        return isFlag(FLAG_PROC_FROM_NIO);
+    }
+
+    /**
+     * @param procFromNioThread {@code True} if message should be processed from NIO thread.
+     */
+    void processFromNioThread(boolean procFromNioThread) {
+        setFlag(procFromNioThread, FLAG_PROC_FROM_NIO);
+    }
+
+    /**
+     * @param flags Flags.
+     */
+    public void flags(byte flags) {
+        this.flags = flags;
+    }
+
+    /**
+     * @return Flags.
+     */
+    public byte flags() {
+        return flags;
+    }
+
+    /**
+     * Sets flag mask.
+     *
+     * @param flag Set or clear.
+     * @param mask Mask.
+     */
+    private void setFlag(boolean flag, int mask) {
+        flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+    }
+
+    /**
+     * Reads flag mask.
+     *
+     * @param mask Mask to read.
+     * @return Flag value.
+     */
+    private boolean isFlag(int mask) {
+        return (flags & mask) != 0;
+    }
+
+    /**
+     * @return {@code true} if this is request.
+     */
+    public boolean request() {
+        return req;
+    }
+
+    /**
+     * @return ID.
+     */
+    public long id() {
+        return id;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+        writer.setBuffer(buf);
+
+        if (!writer.isHeaderWritten()) {
+            if (!writer.writeHeader(directType(), fieldsCount()))
+                return false;
+
+            writer.onHeaderWritten();
+        }
+
+        switch (writer.state()) {
+            case 0:
+                if (!writer.writeByte("flags", flags))
+                    return false;
+
+                writer.incrementState();
+
+            case 1:
+                if (!writer.writeLong("id", id))
+                    return false;
+
+                writer.incrementState();
+
+            case 2:
+                if (!writer.writeByteArray("payload", payload))
+                    return false;
+
+                writer.incrementState();
+
+            case 3:
+                if (!writer.writeBoolean("req", req))
+                    return false;
+
+                writer.incrementState();
+
+        }
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+        reader.setBuffer(buf);
+
+        if (!reader.beforeMessageRead())
+            return false;
+
+        switch (reader.state()) {
+            case 0:
+                flags = reader.readByte("flags");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 1:
+                id = reader.readLong("id");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 2:
+                payload = reader.readByteArray("payload");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+            case 3:
+                req = reader.readBoolean("req");
+
+                if (!reader.isLastRead())
+                    return false;
+
+                reader.incrementState();
+
+        }
+
+        return reader.afterMessageRead(IgniteIoTestMessage.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte directType() {
+        return -43;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte fieldsCount() {
+        return 4;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onAckReceived() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(IgniteIoTestMessage.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2e24e67..a8d9f1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     /** Asynchronous operations limit semaphore. */
     private Semaphore asyncOpsSem;
 
+    /** */
+    protected volatile boolean asyncToggled;
+
     /** {@inheritDoc} */
     @Override public String name() {
         return cacheCfg.getName();
@@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
     }
 
     /**
+     * Toggles async flag if someone calls {@code withAsync()}
+     * on proxy and since that we have to properly handle all cache
+     * operations (sync and async) to put them in proper sequence.
+     *
+     * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
+     */
+    void toggleAsync() {
+        if (!asyncToggled)
+            asyncToggled = true;
+    }
+
+    /**
      * Prints memory stats.
      */
     public void printMemoryStats() {
@@ -1134,7 +1149,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
                 execSvc = Executors.newFixedThreadPool(jobs.size() - 1);
 
                 for (int i = 1; i < jobs.size(); i++)
-                    execSvc.submit(jobs.get(i));
+                    execSvc.execute(jobs.get(i));
             }
 
             try {
@@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Put future.
      */
     public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+        A.notNull(key, "key", val, "val");
+
         final boolean statsEnabled = ctx.config().isStatisticsEnabled();
 
         final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      */
     public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
         @Nullable final CacheEntryPredicate filter) {
-        A.notNull(key, "key", val, "val");
-
         if (keyCheck)
             validateCacheKey(key);
 
@@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * @return Failed future if waiting was interrupted.
      */
     @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+        if (!asyncToggled)
+            return null;
+
         try {
             if (asyncOpsSem != null)
                 asyncOpsSem.acquire();
@@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
      * Releases asynchronous operations permit, if limited.
      */
     protected void asyncOpRelease() {
-        if (asyncOpsSem != null)
+        if (asyncOpsSem != null && asyncToggled)
             asyncOpsSem.release();
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 71f99d3..0646d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -129,6 +129,13 @@ public abstract class GridCacheMessage implements Message {
     }
 
     /**
+     * @return Partition ID this message is targeted to or {@code -1} if it cannot be determined.
+     */
+    public int partition() {
+        return -1;
+    }
+
+    /**
      * If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is
      * set to {@code true}, then the error will be passed into this method.
      *

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..3178203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -110,6 +110,41 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
  * Cache utility methods.
  */
 public class GridCacheUtils {
+    /** Cheat cache ID for debugging and benchmarking purposes. */
+    public static final int cheatCacheId;
+
+    /*
+     *
+     */
+    static {
+        String cheatCache = System.getProperty("CHEAT_CACHE");
+
+        if (cheatCache != null) {
+            cheatCacheId = cheatCache.hashCode();
+
+            if (cheatCacheId == 0)
+                throw new RuntimeException();
+
+            System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']');
+        }
+        else
+            cheatCacheId = 0;
+    }
+
+    /**
+     * Quickly checks if passed in cache ID is a "cheat cache ID" set by -DCHEAT_CACHE=user_cache_name
+     * and resolved in static block above.
+     *
+     * FOR DEBUGGING AND TESTING PURPOSES!
+     *
+     * @param id Cache ID to check.
+     * @return {@code True} if this is cheat cache ID.
+     */
+    @Deprecated
+    public static boolean cheatCache(int id) {
+        return cheatCacheId != 0 && id == cheatCacheId;
+    }
+
     /**  Hadoop syste cache name. */
     public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache";
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9e6e82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
     }
 
     /** {@inheritDoc} */
+    @Override public IgniteCache<K, V> withAsync() {
+        if (delegate instanceof GridCacheAdapter)
+            ((GridCacheAdapter)delegate).toggleAsync();
+
+        return super.withAsync();
+    }
+
+    /** {@inheritDoc} */
     @Override public IgniteCache<K, V> withSkipStore() {
         return skipStore();
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 9639a9a..a671296 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -310,6 +310,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
         return keys;
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /**
      * @return Max lock wait time.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..c5cf332 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -86,6 +86,12 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
     }
 
     /** {@inheritDoc} */
+    @Override public int partition() {
+        // TODO https://issues.apache.org/jira/browse/IGNITE-4371
+        return Integer.MIN_VALUE;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
         writer.setBuffer(buf);
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index df6acdd..5d70ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -89,6 +89,11 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
         partIds.add(key.partition());
     }
 
+    /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
     /** {@inheritDoc}
      * @param ctx*/
     @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 35e6267..519d0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -218,7 +218,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
     @Override public void onKernalStart() throws IgniteCheckedException {
         super.onKernalStart();
 
-        preldr.onKernalStart();
+        if (preldr != null)
+            preldr.onKernalStart();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index 1e92b54..63e3309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.GridDirectCollection;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockResponse;
 import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLeanSet;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.lang.IgniteUuid;
@@ -57,7 +57,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
     /** Invalid partitions. */
     @GridToStringInclude
     @GridDirectCollection(int.class)
-    private Collection<Integer> invalidParts = new GridLeanSet<>();
+    private Collection<Integer> invalidParts;
 
     /** Preload entries. */
     @GridDirectCollection(GridCacheEntryInfo.class)
@@ -127,6 +127,9 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
      * @param part Invalid partition.
      */
     public void addInvalidPartition(int part) {
+        if (invalidParts == null)
+            invalidParts = new HashSet<>();
+
         invalidParts.add(part);
     }
 
@@ -134,7 +137,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
      * @return Invalid partitions.
      */
     public Collection<Integer> invalidPartitions() {
-        return invalidParts;
+        return invalidParts == null ? Collections.<Integer>emptySet() : invalidParts;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 940c74e..0e60ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
     /** {@inheritDoc} */
     @SuppressWarnings("unchecked")
     @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
-        A.notNull(key, "key", val, "val");
-
         return updateAsync0(
             key,
             val,
@@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
      */
     @SuppressWarnings("unchecked")
     protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
+        if (!asyncToggled)
+            return op.apply();
+
         IgniteInternalFuture<T> fail = asyncOpAcquire();
 
         if (fail != null)

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index a03d948..0af7cf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -235,6 +235,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
     }
 
     /** {@inheritDoc} */
+    @Override public int partition() {
+        return partId;
+    }
+
+    /** {@inheritDoc} */
     @Override public int partitionId(int idx) {
         assert idx == 0 : idx;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index f2fbb0e..1854e52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -487,6 +487,11 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
     }
 
     /** {@inheritDoc} */
+    @Override public int partition() {
+        return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+    }
+
+    /** {@inheritDoc} */
     @Override public long conflictExpireTime(int idx) {
         if (conflictExpireTimes != null) {
             assert idx >= 0 && idx < conflictExpireTimes.size();