You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/22 15:16:17 UTC
[37/50] [abbrv] ignite git commit: IGNITE-3220 I/O bottleneck on
server/client cluster configuration Communications optimizations: -
possibility to open separate in/out connections - possibility to have
multiple connections between nodes - implemented NI
IGNITE-3220 I/O bottleneck on server/client cluster configuration
Communications optimizations:
- possibility to open separate in/out connections
- possibility to have multiple connections between nodes
- implemented NIO sessions balancing between NIO threads
- reduced amount of work and blocking calls in NIO threads
Other:
- implemented StripedExecutor for cache messages handling
- added 'io test' messages for IO performance testing
(cherry picked from commit 10ade28)
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/05dd08b9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/05dd08b9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/05dd08b9
Branch: refs/heads/master
Commit: 05dd08b993e2d7f88176c051463b178431714f85
Parents: 57eb47f
Author: sboikov <sb...@gridgain.com>
Authored: Fri Dec 9 12:28:47 2016 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Dec 16 14:01:53 2016 +0300
----------------------------------------------------------------------
.../ignite/examples/ExampleNodeStartup.java | 2 +-
.../examples/datagrid/CachePutGetExample.java | 2 +-
.../scalar/examples/ScalarJvmCloudExample.scala | 2 +-
.../rest/ClientMemcachedProtocolSelfTest.java | 4 +-
.../rest/protocols/tcp/MockNioSession.java | 25 +-
.../apache/ignite/IgniteSystemProperties.java | 3 +
.../cache/store/CacheLoadOnlyStoreAdapter.java | 6 +-
.../configuration/IgniteConfiguration.java | 50 +-
.../internal/GridEventConsumeHandler.java | 2 +-
.../ignite/internal/GridJobContextImpl.java | 4 +-
.../ignite/internal/GridKernalContext.java | 9 +
.../ignite/internal/GridKernalContextImpl.java | 16 +-
.../internal/GridPerformanceSuggestions.java | 2 +-
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../ignite/internal/IgniteInternalFuture.java | 11 +
.../apache/ignite/internal/IgniteKernal.java | 85 +-
.../org/apache/ignite/internal/IgnitionEx.java | 32 +-
.../GridClientConnectionManagerAdapter.java | 1 +
.../client/router/impl/GridTcpRouterImpl.java | 1 +
.../managers/communication/GridIoManager.java | 207 ++-
.../managers/communication/GridIoMessage.java | 13 +
.../communication/GridIoMessageFactory.java | 12 +-
.../communication/IgniteIoTestMessage.java | 235 +++
.../processors/cache/GridCacheAdapter.java | 26 +-
.../processors/cache/GridCacheMessage.java | 7 +
.../processors/cache/GridCacheUtils.java | 35 +
.../processors/cache/IgniteCacheProxy.java | 8 +
.../distributed/GridDistributedLockRequest.java | 5 +
.../GridDistributedTxFinishResponse.java | 6 +
.../GridDistributedUnlockRequest.java | 5 +
.../distributed/dht/GridDhtCacheAdapter.java | 3 +-
.../distributed/dht/GridDhtLockResponse.java | 9 +-
.../dht/atomic/GridDhtAtomicCache.java | 5 +-
.../GridDhtAtomicSingleUpdateRequest.java | 5 +
.../dht/atomic/GridDhtAtomicUpdateRequest.java | 5 +
.../atomic/GridNearAtomicFullUpdateRequest.java | 5 +
.../GridNearAtomicSingleUpdateRequest.java | 5 +
.../distributed/near/GridNearGetRequest.java | 5 +
.../local/atomic/GridLocalAtomicCache.java | 3 +
.../query/GridCacheDistributedQueryManager.java | 2 +-
.../cache/query/GridCacheQueryRequest.java | 6 +-
.../transactions/IgniteTxLocalAdapter.java | 8 +-
.../datastreamer/DataStreamProcessor.java | 22 +-
.../internal/processors/igfs/IgfsContext.java | 4 +-
.../processors/igfs/IgfsDataManager.java | 6 +-
.../internal/processors/igfs/IgfsImpl.java | 2 +-
.../internal/processors/odbc/OdbcProcessor.java | 1 +
.../platform/compute/PlatformCompute.java | 6 +
.../tcp/GridTcpMemcachedNioListener.java | 15 +-
.../protocols/tcp/GridTcpRestNioListener.java | 2 +-
.../rest/protocols/tcp/GridTcpRestProtocol.java | 1 +
.../service/GridServiceProcessor.java | 6 +-
.../ignite/internal/util/IgniteUtils.java | 62 +-
.../ignite/internal/util/StripedExecutor.java | 667 +++++++++
.../util/future/GridFinishedFuture.java | 24 +
.../internal/util/future/GridFutureAdapter.java | 15 +-
.../util/future/GridFutureChainListener.java | 30 +-
.../internal/util/ipc/IpcToNioAdapter.java | 2 +-
.../nio/GridAbstractCommunicationClient.java | 12 +-
.../util/nio/GridCommunicationClient.java | 9 +-
.../nio/GridConnectionBytesVerifyFilter.java | 15 +-
.../util/nio/GridNioAsyncNotifyFilter.java | 10 +-
.../internal/util/nio/GridNioCodecFilter.java | 17 +-
.../ignite/internal/util/nio/GridNioFilter.java | 16 +-
.../internal/util/nio/GridNioFilterAdapter.java | 10 +-
.../internal/util/nio/GridNioFilterChain.java | 14 +-
.../ignite/internal/util/nio/GridNioFuture.java | 4 +-
.../util/nio/GridNioRecoveryDescriptor.java | 124 +-
.../ignite/internal/util/nio/GridNioServer.java | 1404 +++++++++++++++---
.../internal/util/nio/GridNioSession.java | 25 +-
.../internal/util/nio/GridNioSessionImpl.java | 65 +-
.../ignite/internal/util/nio/GridNioWorker.java | 48 +
.../util/nio/GridSelectorNioSessionImpl.java | 221 ++-
.../util/nio/GridShmemCommunicationClient.java | 7 +-
.../util/nio/GridTcpNioCommunicationClient.java | 55 +-
.../internal/util/nio/SessionWriteRequest.java | 85 ++
.../internal/util/nio/ssl/GridNioSslFilter.java | 10 +-
.../util/nio/ssl/GridNioSslHandler.java | 4 +-
.../util/tostring/GridToStringBuilder.java | 2 +-
.../communication/tcp/TcpCommunicationSpi.java | 1340 ++++++++++++-----
.../tcp/TcpCommunicationSpiMBean.java | 40 +
.../ignite/spi/discovery/tcp/ServerImpl.java | 14 +-
.../ignite/stream/socket/SocketStreamer.java | 1 +
.../ignite/thread/IgniteThreadFactory.java | 8 +-
.../IgniteSlowClientDetectionSelfTest.java | 1 +
...unicationBalanceMultipleConnectionsTest.java | 28 +
.../IgniteCommunicationBalanceTest.java | 339 +++++
.../communication/IgniteIoTestMessagesTest.java | 95 ++
.../IgniteVariousConnectionNumberTest.java | 166 +++
.../cache/CrossCacheTxRandomOperationsTest.java | 30 +-
...idAbstractCacheInterceptorRebalanceTest.java | 4 +-
...CacheOffHeapMultiThreadedUpdateSelfTest.java | 6 +-
...eAtomicMessageRecovery10ConnectionsTest.java | 28 +
...cMessageRecoveryNoPairedConnectionsTest.java | 47 +
...acheConnectionRecovery10ConnectionsTest.java | 35 +
.../distributed/IgniteCacheCreatePutTest.java | 2 +-
.../IgniteCacheMessageRecoveryAbstractTest.java | 24 +-
.../IgniteCacheMessageWriteTimeoutTest.java | 17 +-
.../dht/IgniteCacheMultiTxLockSelfTest.java | 6 +-
...erNoStripedPoolMultiNodeFullApiSelfTest.java | 35 +
...edNoStripedPoolMultiNodeFullApiSelfTest.java | 35 +
.../TxDeadlockDetectionNoHangsTest.java | 2 +-
.../TxOptimisticDeadlockDetectionTest.java | 29 +-
.../GridServiceProcessorProxySelfTest.java | 2 +-
.../util/future/GridFutureAdapterSelfTest.java | 122 +-
.../nio/impl/GridNioFilterChainSelfTest.java | 32 +-
.../loadtests/nio/GridNioBenchmarkClient.java | 4 +-
.../p2p/GridP2PRecursionTaskSelfTest.java | 2 +-
.../spi/GridTcpSpiForwardingSelfTest.java | 18 +-
.../GridTcpCommunicationSpiAbstractTest.java | 28 +-
...mmunicationSpiConcurrentConnectSelfTest.java | 82 +-
.../GridTcpCommunicationSpiConfigSelfTest.java | 5 +-
...cpCommunicationSpiMultithreadedSelfTest.java | 23 +-
...dTcpCommunicationSpiRecoveryAckSelfTest.java | 9 +-
...tionSpiRecoveryFailureDetectionSelfTest.java | 1 +
...ationSpiRecoveryNoPairedConnectionsTest.java | 28 +
...GridTcpCommunicationSpiRecoverySelfTest.java | 67 +-
...CommunicationRecoveryAckClosureSelfTest.java | 9 +-
.../junits/GridTestKernalContext.java | 4 +-
.../IgniteCacheFullApiSelfTestSuite.java | 6 +
.../ignite/testsuites/IgniteCacheTestSuite.java | 17 +-
.../IgniteSpiCommunicationSelfTestSuite.java | 2 +
.../hadoop/jobtracker/HadoopJobTracker.java | 4 +-
.../HadoopExternalCommunication.java | 5 +-
.../communication/HadoopIpcToNioAdapter.java | 2 +-
.../communication/HadoopMarshallerFilter.java | 6 +-
.../ignite/stream/kafka/KafkaStreamer.java | 2 +-
.../ignite/tools/classgen/ClassesGenerator.java | 8 +-
.../ignite/yardstick/IgniteBenchmarkUtils.java | 6 +-
.../yardstick/cache/CacheEntryEventProbe.java | 2 +-
.../yardstick/cache/IgniteIoTestBenchmark.java | 73 +
.../io/IgniteIoTestAbstractBenchmark.java | 61 +
.../io/IgniteIoTestSendAllBenchmark.java | 32 +
.../io/IgniteIoTestSendRandomBenchmark.java | 35 +
134 files changed, 5935 insertions(+), 998 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
index ad12297..dd8a72b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
+++ b/examples/src/main/java/org/apache/ignite/examples/ExampleNodeStartup.java
@@ -33,4 +33,4 @@ public class ExampleNodeStartup {
public static void main(String[] args) throws IgniteException {
Ignition.start("examples/config/example-ignite.xml");
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
index 82a76b8..b9bae5b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/datagrid/CachePutGetExample.java
@@ -105,4 +105,4 @@ public class CachePutGetExample {
for (Map.Entry<Integer, String> e : vals.entrySet())
System.out.println("Got entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
index 1014726..814bb2e 100644
--- a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
+++ b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarJvmCloudExample.scala
@@ -50,7 +50,7 @@ object ScalarJvmCloudExample {
val pool = Executors.newFixedThreadPool(NODES.size)
// Concurrently startup all nodes.
- NODES.foreach(name => pool.submit(new Runnable {
+ NODES.foreach(name => pool.execute(new Runnable {
@impl def run() {
// All defaults.
val cfg = new IgniteConfiguration
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
index 0f56c73..c03c06e 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/ClientMemcachedProtocolSelfTest.java
@@ -111,6 +111,8 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe
Map<String, Object> map = client.getBulk("getKey1", "getKey2");
+ info("Map: " + map);
+
Assert.assertEquals(2, map.size());
Assert.assertEquals("getVal1", map.get("getKey1"));
@@ -443,4 +445,4 @@ public class ClientMemcachedProtocolSelfTest extends AbstractRestProcessorSelfTe
return res;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
----------------------------------------------------------------------
diff --git a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
index c82c73e..9bc4e7f 100644
--- a/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
+++ b/modules/clients/src/test/java/org/apache/ignite/internal/processors/rest/protocols/tcp/MockNioSession.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.rest.protocols.tcp;
import java.net.InetSocketAddress;
+import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.util.lang.GridMetadataAwareAdapter;
import org.apache.ignite.internal.util.nio.GridNioFinishedFuture;
import org.apache.ignite.internal.util.nio.GridNioFuture;
@@ -111,6 +112,11 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
}
/** {@inheritDoc} */
+ @Override public void sendNoFuture(Object msg) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
@Override public GridNioFuture<Object> resumeReads() {
return null;
}
@@ -131,12 +137,27 @@ public class MockNioSession extends GridMetadataAwareAdapter implements GridNioS
}
/** {@inheritDoc} */
- @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
// No-op.
}
/** {@inheritDoc} */
- @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
+ @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
return null;
}
+
+ /** {@inheritDoc} */
+ @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void systemMessage(Object msg) {
+ // No-op.
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
index d0c0d5e..9650a31 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java
@@ -477,6 +477,9 @@ public final class IgniteSystemProperties {
@Deprecated
public static final String IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES = "IGNITE_BINARY_DONT_WRAP_TREE_STRUCTURES";
+ /** */
+ public static final String IGNITE_IO_BALANCE_PERIOD = "IGNITE_IO_BALANCE_PERIOD";
+
/**
* When set to {@code true} fields are written by BinaryMarshaller in sorted order. Otherwise
* the natural order is used.
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
index 7494e37..d3f381e 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/store/CacheLoadOnlyStoreAdapter.java
@@ -153,14 +153,14 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K
buf.add(iter.next());
if (buf.size() == batchSize) {
- exec.submit(new Worker(c, buf, args));
+ exec.execute(new Worker(c, buf, args));
buf = new ArrayList<>(batchSize);
}
}
if (!buf.isEmpty())
- exec.submit(new Worker(c, buf, args));
+ exec.execute(new Worker(c, buf, args));
}
catch (RejectedExecutionException ignored) {
// Because of custom RejectedExecutionHandler.
@@ -330,4 +330,4 @@ public abstract class CacheLoadOnlyStoreAdapter<K, V, I> implements CacheStore<K
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 75145a3..dcd8a80 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -146,7 +146,7 @@ public class IgniteConfiguration {
public static final int AVAILABLE_PROC_CNT = Runtime.getRuntime().availableProcessors();
/** Default core size of public thread pool. */
- public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT) * 2;
+ public static final int DFLT_PUBLIC_THREAD_CNT = Math.max(8, AVAILABLE_PROC_CNT);
/** Default keep alive time for public thread pool. */
@Deprecated
@@ -236,6 +236,12 @@ public class IgniteConfiguration {
/** Async Callback pool size. */
private int callbackPoolSize = DFLT_PUBLIC_THREAD_CNT;
+ /**
+ * Use striped pool for internal requests processing when possible
+ * (e.g. cache requests per-partition striping).
+ */
+ private int stripedPoolSize = DFLT_PUBLIC_THREAD_CNT;
+
/** System pool size. */
private int sysPoolSize = DFLT_SYSTEM_CORE_THREAD_CNT;
@@ -553,6 +559,7 @@ public class IgniteConfiguration {
sndRetryDelay = cfg.getNetworkSendRetryDelay();
sslCtxFactory = cfg.getSslContextFactory();
storeSesLsnrs = cfg.getCacheStoreSessionListenerFactories();
+ stripedPoolSize = cfg.getStripedPoolSize();
svcCfgs = cfg.getServiceConfiguration();
sysPoolSize = cfg.getSystemThreadPoolSize();
timeSrvPortBase = cfg.getTimeServerPortBase();
@@ -712,6 +719,47 @@ public class IgniteConfiguration {
}
/**
+ * Returns striped pool size that should be used for cache requests
+ * processing.
+ * <p>
+ * If set to non-positive value then requests get processed in system pool.
+ * <p>
+ * Striped pool is better for typical cache operations.
+ *
+ * @return Positive value if striped pool should be initialized
+ * with configured number of threads (stripes) and used for requests processing
+ * or non-positive value to process requests in system pool.
+ *
+ * @see #getPublicThreadPoolSize()
+ * @see #getSystemThreadPoolSize()
+ */
+ public int getStripedPoolSize() {
+ return stripedPoolSize;
+ }
+
+ /**
+ * Sets striped pool size that should be used for cache requests
+ * processing.
+ * <p>
+ * If set to non-positive value then requests get processed in system pool.
+ * <p>
+ * Striped pool is better for typical cache operations.
+ *
+ * @param stripedPoolSize Positive value if striped pool should be initialized
+ * with passed in number of threads (stripes) and used for requests processing
+ * or non-positive value to process requests in system pool.
+ * @return {@code this} for chaining.
+ *
+ * @see #getPublicThreadPoolSize()
+ * @see #getSystemThreadPoolSize()
+ */
+ public IgniteConfiguration setStripedPoolSize(int stripedPoolSize) {
+ this.stripedPoolSize = stripedPoolSize;
+
+ return this;
+ }
+
+ /**
* Should return a thread pool size to be used in grid.
* This executor service will be in charge of processing {@link ComputeJob GridJobs}
* and user messages sent to node.
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
index 715f8a5..68d34ce 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridEventConsumeHandler.java
@@ -181,7 +181,7 @@ class GridEventConsumeHandler implements GridContinuousHandler {
notificationQueue.add(new T3<>(nodeId, routineId, evt));
if (!notificationInProgress) {
- ctx.getSystemExecutorService().submit(new Runnable() {
+ ctx.getSystemExecutorService().execute(new Runnable() {
@Override public void run() {
if (!ctx.continuous().lockStopping())
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
index 804d228..dbfa0b1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
@@ -217,7 +217,7 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable {
assert execSvc != null;
- execSvc.submit(new Runnable() {
+ execSvc.execute(new Runnable() {
@Override public void run() {
callcc0();
}
@@ -300,4 +300,4 @@ public class GridJobContextImpl implements ComputeJobContext, Externalizable {
@Override public String toString() {
return S.toString(GridJobContextImpl.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index ae29223..927944f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.plugin.PluginNotFoundException;
import org.apache.ignite.plugin.PluginProvider;
@@ -511,6 +512,14 @@ public interface GridKernalContext extends Iterable<GridComponent> {
public ExecutorService getSystemExecutorService();
/**
+ * Executor service that is in charge of processing internal system messages
+ * in stripes (dedicated threads).
+ *
+ * @return Thread pool implementation to be used in grid for internal system messages.
+ */
+ public StripedExecutor getStripedExecutorService();
+
+ /**
* Executor service that is in charge of processing internal and Visor
* {@link org.apache.ignite.compute.ComputeJob GridJobs}.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 94c6448..a2ad1b2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -83,6 +83,7 @@ import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.IgniteExceptionRegistry;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -300,6 +301,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ protected StripedExecutor stripedExecSvc;
+
+ /** */
+ @GridToStringExclude
private ExecutorService p2pExecSvc;
/** */
@@ -381,6 +386,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param marshCachePool Marshaller cache pool.
* @param execSvc Public executor service.
* @param sysExecSvc System executor service.
+ * @param stripedExecSvc Striped executor.
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
@@ -400,6 +406,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService marshCachePool,
ExecutorService execSvc,
ExecutorService sysExecSvc,
+ StripedExecutor stripedExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
@@ -407,7 +414,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
- List<PluginProvider> plugins) throws IgniteCheckedException {
+ List<PluginProvider> plugins
+ ) throws IgniteCheckedException {
assert grid != null;
assert cfg != null;
assert gw != null;
@@ -419,6 +427,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.marshCachePool = marshCachePool;
this.execSvc = execSvc;
this.sysExecSvc = sysExecSvc;
+ this.stripedExecSvc = stripedExecSvc;
this.p2pExecSvc = p2pExecSvc;
this.mgmtExecSvc = mgmtExecSvc;
this.igfsExecSvc = igfsExecSvc;
@@ -948,6 +957,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public StripedExecutor getStripedExecutorService() {
+ return stripedExecSvc;
+ }
+
+ /** {@inheritDoc} */
@Override public ExecutorService getManagementExecutorService() {
return mgmtExecSvc;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
index b040a97..5e8e520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridPerformanceSuggestions.java
@@ -89,4 +89,4 @@ public class GridPerformanceSuggestions {
@Override public String toString() {
return S.toString(GridPerformanceSuggestions.class, this);
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index b5608db..24ddcd2 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -97,7 +97,10 @@ public enum GridTopic {
TOPIC_QUERY,
/** */
- TOPIC_TX;
+ TOPIC_TX,
+
+ /** */
+ TOPIC_IO_TEST;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
index b80a755..789556d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteInternalFuture.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.ignite.IgniteCheckedException;
@@ -133,6 +134,16 @@ public interface IgniteInternalFuture<R> {
public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb);
/**
+ * Make a chained future to convert result of this future (when complete) into a new format.
+ * It is guaranteed that done callback will be called only ONCE.
+ *
+ * @param doneCb Done callback that is applied to this future when it finishes to produce chained future result.
+ * @param exec Executor to run callback.
+ * @return Chained future that finishes after this future completes and done callback is called.
+ */
+ public <T> IgniteInternalFuture<T> chain(IgniteClosure<? super IgniteInternalFuture<R>, T> doneCb, Executor exec);
+
+ /**
* @return Error value if future has already been completed with error.
*/
public Throwable error();
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 57aab00..7935e06 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -61,10 +61,10 @@ import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.IgniteQueue;
-import org.apache.ignite.IgniteLock;
import org.apache.ignite.IgniteScheduler;
import org.apache.ignite.IgniteSemaphore;
import org.apache.ignite.IgniteServices;
@@ -115,7 +115,6 @@ import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor;
import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor;
import org.apache.ignite.internal.processors.hadoop.Hadoop;
-import org.apache.ignite.internal.processors.hadoop.HadoopClassLoader;
import org.apache.ignite.internal.processors.hadoop.HadoopProcessorAdapter;
import org.apache.ignite.internal.processors.job.GridJobProcessor;
import org.apache.ignite.internal.processors.jobmetrics.GridJobMetricsProcessor;
@@ -139,6 +138,7 @@ import org.apache.ignite.internal.processors.service.GridServiceProcessor;
import org.apache.ignite.internal.processors.session.GridTaskSessionProcessor;
import org.apache.ignite.internal.processors.task.GridTaskProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
@@ -175,6 +175,7 @@ import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.thread.IgniteStripedThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CONFIG_URL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DAEMON;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_NO_ASCII;
@@ -182,7 +183,6 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALL
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_STARVATION_CHECK_INTERVAL;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SUCCESS_FILE;
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_BINARY_MARSHALLER_USE_STRING_SERIALIZATION_VER_2;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.snapshot;
import static org.apache.ignite.internal.GridKernalState.DISCONNECTED;
@@ -199,7 +199,6 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_BUILD_VER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CLIENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_CONSISTENCY_CHECK_SKIPPED;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DAEMON;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DEPLOYMENT_MODE;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_GRID_NAME;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IPS;
@@ -208,11 +207,12 @@ import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JMX_PORT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_ARGS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_JVM_PID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LANG_RUNTIME;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_LATE_AFFINITY_ASSIGNMENT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_COMPACT_FOOTER;
-import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_BINARY_STRING_SER_VER_2;
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MARSHALLER_USE_DFLT_SUID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_NODE_CONSISTENT_ID;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PEER_CLASSLOADING;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PHY_RAM;
@@ -663,6 +663,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param utilityCachePool Utility cache pool.
* @param execSvc Executor service.
* @param sysExecSvc System executor service.
+ * @param stripedExecSvc Striped executor.
* @param p2pExecSvc P2P executor service.
* @param mgmtExecSvc Management executor service.
* @param igfsExecSvc IGFS executor service.
@@ -673,11 +674,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@SuppressWarnings({"CatchGenericClass", "unchecked"})
- public void start(final IgniteConfiguration cfg,
+ public void start(
+ final IgniteConfiguration cfg,
ExecutorService utilityCachePool,
ExecutorService marshCachePool,
final ExecutorService execSvc,
final ExecutorService sysExecSvc,
+ final StripedExecutor stripedExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
ExecutorService igfsExecSvc,
@@ -685,7 +688,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
- GridAbsClosure errHnd)
+ GridAbsClosure errHnd
+ )
throws IgniteCheckedException
{
gw.compareAndSet(null, new GridKernalGatewayImpl(cfg.getGridName()));
@@ -785,6 +789,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
marshCachePool,
execSvc,
sysExecSvc,
+ stripedExecSvc,
p2pExecSvc,
mgmtExecSvc,
igfsExecSvc,
@@ -792,7 +797,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
affExecSvc,
idxExecSvc,
callbackExecSvc,
- plugins);
+ plugins
+ );
cfg.getMarshaller().setContext(ctx.marshallerContext());
@@ -986,24 +992,51 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
starveTask = ctx.timeout().schedule(new Runnable() {
/** Last completed task count. */
- private long lastCompletedCnt;
+ private long lastCompletedCntPub;
+
+ /** Last completed task count. */
+ private long lastCompletedCntSys;
@Override public void run() {
- if (!(execSvc instanceof ThreadPoolExecutor))
- return;
+ if (execSvc instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+
+ lastCompletedCntPub = checkPoolStarvation(exec, lastCompletedCntPub, "public");
+ }
- ThreadPoolExecutor exec = (ThreadPoolExecutor)execSvc;
+ if (sysExecSvc instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor exec = (ThreadPoolExecutor)sysExecSvc;
+ lastCompletedCntSys = checkPoolStarvation(exec, lastCompletedCntSys, "system");
+ }
+
+ if (stripedExecSvc != null)
+ stripedExecSvc.checkStarvation();
+ }
+
+ /**
+ * @param exec Thread pool executor to check.
+ * @param lastCompletedCnt Last completed tasks count.
+ * @param pool Pool name for message.
+ * @return Current completed tasks count.
+ */
+ private long checkPoolStarvation(
+ ThreadPoolExecutor exec,
+ long lastCompletedCnt,
+ String pool
+ ) {
long completedCnt = exec.getCompletedTaskCount();
// If all threads are active and no task has completed since last time and there is
// at least one waiting request, then it is possible starvation.
if (exec.getPoolSize() == exec.getActiveCount() && completedCnt == lastCompletedCnt &&
!exec.getQueue().isEmpty())
- LT.warn(log, "Possible thread pool starvation detected (no task completed in last " +
- interval + "ms, is executorService pool size large enough?)");
+ LT.warn(
+ log,
+ "Possible thread pool starvation detected (no task completed in last " +
+ interval + "ms, is " + pool + " thread pool size large enough?)");
- lastCompletedCnt = completedCnt;
+ return completedCnt;
}
}, interval, interval);
}
@@ -1128,6 +1161,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}, longOpDumpTimeout, longOpDumpTimeout);
}
+ ctx.performance().add("Disable assertions (remove '-ea' from JVM options)", !U.assertionsEnabled());
+
ctx.performance().logSuggestions(log, gridName);
U.quietAndInfo(log, "To start Console Management & Monitoring run ignitevisorcmd.{sh|bat}");
@@ -3509,6 +3544,26 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
}
}
+ /**
+ * @param node Node.
+ * @param payload Message payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+ return ctx.io().sendIoTest(node, payload, procFromNioThread);
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @param payload Message payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+ return ctx.io().sendIoTest(nodes, payload, procFromNioThread);
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteKernal.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index b3a9eec..f32a753 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -66,6 +66,7 @@ import org.apache.ignite.internal.processors.igfs.IgfsUtils;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.F;
@@ -1459,6 +1460,9 @@ public class IgnitionEx {
/** System executor service. */
private ThreadPoolExecutor sysExecSvc;
+ /** */
+ private StripedExecutor stripedExecSvc;
+
/** Management executor service. */
private ThreadPoolExecutor mgmtExecSvc;
@@ -1652,8 +1656,6 @@ public class IgnitionEx {
execSvc.allowCoreThreadTimeOut(true);
- // Note that since we use 'LinkedBlockingQueue', number of
- // maximum threads has no effect.
validateThreadPoolSize(cfg.getSystemThreadPoolSize(), "system");
sysExecSvc = new IgniteThreadPoolExecutor(
@@ -1666,6 +1668,9 @@ public class IgnitionEx {
sysExecSvc.allowCoreThreadTimeOut(true);
+ if (cfg.getStripedPoolSize() > 0)
+ stripedExecSvc = new StripedExecutor(cfg.getStripedPoolSize(), cfg.getGridName(), "sys", log);
+
// Note that since we use 'LinkedBlockingQueue', number of
// maximum threads has no effect.
// Note, that we do not pre-start threads here as management pool may
@@ -1791,13 +1796,26 @@ public class IgnitionEx {
// Init here to make grid available to lifecycle listeners.
grid = grid0;
- grid0.start(myCfg, utilityCacheExecSvc, marshCacheExecSvc, execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc,
- igfsExecSvc, restExecSvc, affExecSvc, idxExecSvc, callbackExecSvc,
+ grid0.start(
+ myCfg,
+ utilityCacheExecSvc,
+ marshCacheExecSvc,
+ execSvc,
+ sysExecSvc,
+ stripedExecSvc,
+ p2pExecSvc,
+ mgmtExecSvc,
+ igfsExecSvc,
+ restExecSvc,
+ affExecSvc,
+ idxExecSvc,
+ callbackExecSvc,
new CA() {
@Override public void apply() {
startLatch.countDown();
}
- });
+ }
+ );
state = STARTED;
@@ -2415,6 +2433,10 @@ public class IgnitionEx {
sysExecSvc = null;
+ U.shutdownNow(getClass(), stripedExecSvc, log);
+
+ stripedExecSvc = null;
+
U.shutdownNow(getClass(), mgmtExecSvc, log);
mgmtExecSvc = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
index 6ea7c22..12baee0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/impl/connection/GridClientConnectionManagerAdapter.java
@@ -200,6 +200,7 @@ public abstract class GridClientConnectionManagerAdapter implements GridClientCo
.socketSendBufferSize(0)
.idleTimeout(Long.MAX_VALUE)
.gridName(routerClient ? "routerClient" : "gridClient")
+ .serverName("tcp-client")
.daemon(cfg.isDaemon())
.build();
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
index 06a4929..3566830 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/client/router/impl/GridTcpRouterImpl.java
@@ -258,6 +258,7 @@ public class GridTcpRouterImpl implements GridTcpRouter, GridTcpRouterMBean, Lif
.logger(log)
.selectorCount(Runtime.getRuntime().availableProcessors())
.gridName(gridName)
+ .serverName("router")
.tcpNoDelay(tcpNoDelay)
.directBuffer(false)
.byteOrder(ByteOrder.nativeOrder())
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 3df29cf..7ef7bc0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -26,15 +26,17 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterNode;
@@ -44,6 +46,7 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteComponentType;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.direct.DirectMessageReader;
import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.GridManagerAdapter;
@@ -55,6 +58,8 @@ import org.apache.ignite.internal.processors.pool.PoolProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashSet;
import org.apache.ignite.internal.util.GridSpinReadWriteLock;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -83,6 +88,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
+import static org.apache.ignite.internal.GridTopic.TOPIC_IO_TEST;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
@@ -176,6 +182,12 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** Stopping flag. */
private boolean stopping;
+ /** */
+ private final AtomicReference<ConcurrentHashMap<Long, IoTestFuture>> ioTestMap = new AtomicReference<>();
+
+ /** */
+ private final AtomicLong ioTestId = new AtomicLong();
+
/**
* @param ctx Grid kernal context.
*/
@@ -297,6 +309,114 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
if (log.isDebugEnabled())
log.debug(startInfo());
+
+ addMessageListener(GridTopic.TOPIC_IO_TEST, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (node == null)
+ return;
+
+ IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg;
+
+ if (msg0.request()) {
+ IgniteIoTestMessage res = new IgniteIoTestMessage(msg0.id(), false, null);
+
+ res.flags(msg0.flags());
+
+ try {
+ send(node, GridTopic.TOPIC_IO_TEST, res, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send IO test response [msg=" + msg0 + "]", e);
+ }
+ }
+ else {
+ IoTestFuture fut = ioTestMap().get(msg0.id());
+
+ if (fut == null)
+ U.warn(log, "Failed to find IO test future [msg=" + msg0 + ']');
+ else
+ fut.onResponse();
+ }
+ }
+ });
+ }
+
+ /**
+ * @param nodes Nodes.
+ * @param payload Payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(List<ClusterNode> nodes, byte[] payload, boolean procFromNioThread) {
+ long id = ioTestId.getAndIncrement();
+
+ IoTestFuture fut = new IoTestFuture(id, nodes.size());
+
+ IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+ msg.processFromNioThread(procFromNioThread);
+
+ ioTestMap().put(id, fut);
+
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
+
+ try {
+ send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ ioTestMap().remove(msg.id());
+
+ return new GridFinishedFuture(e);
+ }
+ }
+
+ return fut;
+ }
+
+ /**
+ * @param node Node.
+ * @param payload Payload.
+ * @param procFromNioThread If {@code true} message is processed from NIO thread.
+ * @return Response future.
+ */
+ public IgniteInternalFuture sendIoTest(ClusterNode node, byte[] payload, boolean procFromNioThread) {
+ long id = ioTestId.getAndIncrement();
+
+ IoTestFuture fut = new IoTestFuture(id, 1);
+
+ IgniteIoTestMessage msg = new IgniteIoTestMessage(id, true, payload);
+
+ msg.processFromNioThread(procFromNioThread);
+
+ ioTestMap().put(id, fut);
+
+ try {
+ send(node, GridTopic.TOPIC_IO_TEST, msg, GridIoPolicy.SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ ioTestMap().remove(msg.id());
+
+ return new GridFinishedFuture(e);
+ }
+
+ return fut;
+ }
+
+ /**
+ * @return IO test futures map.
+ */
+ private ConcurrentHashMap<Long, IoTestFuture> ioTestMap() {
+ ConcurrentHashMap<Long, IoTestFuture> map = ioTestMap.get();
+
+ if (map == null) {
+ if (!ioTestMap.compareAndSet(null, map = new ConcurrentHashMap<>()))
+ map = ioTestMap.get();
+ }
+
+ return map;
}
/** {@inheritDoc} */
@@ -514,16 +634,6 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return;
}
- // Check discovery.
- ClusterNode node = ctx.discovery().node(nodeId);
-
- if (node == null) {
- if (log.isDebugEnabled())
- log.debug("Ignoring message from dead node [senderId=" + nodeId + ", msg=" + msg + ']');
-
- return; // We can't receive messages from non-discovered ones.
- }
-
if (msg.topic() == null) {
int topicOrd = msg.topicOrdinal();
@@ -678,8 +788,31 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
msgC.run();
}
}
+
+ @Override public String toString() {
+ return "Message closure [msg=" + msg + ']';
+ }
};
+ if (msg.topicOrdinal() == TOPIC_IO_TEST.ordinal()) {
+ IgniteIoTestMessage msg0 = (IgniteIoTestMessage)msg.message();
+
+ if (msg0.processFromNioThread()) {
+ c.run();
+
+ return;
+ }
+ }
+
+ if (ctx.config().getStripedPoolSize() > 0 &&
+ plc == GridIoPolicy.SYSTEM_POOL &&
+ msg.partition() != Integer.MIN_VALUE
+ ) {
+ ctx.getStripedExecutorService().execute(msg.partition(), c);
+
+ return;
+ }
+
try {
pools.poolForPolicy(plc).execute(c);
}
@@ -2460,4 +2593,56 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return S.toString(DelayedMessage.class, this, super.toString());
}
}
+
+ /**
+ *
+ */
+ private class IoTestFuture extends GridFutureAdapter<Object> {
+ /** */
+ private final long id;
+
+ /** */
+ private int cntr;
+
+ /**
+ * @param id ID.
+ * @param cntr Counter.
+ */
+ IoTestFuture(long id, int cntr) {
+ assert cntr > 0 : cntr;
+
+ this.id = id;
+ this.cntr = cntr;
+ }
+
+ /**
+ *
+ */
+ void onResponse() {
+ boolean complete;
+
+ synchronized (this) {
+ complete = --cntr == 0;
+ }
+
+ if (complete)
+ onDone();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
+ if (super.onDone(res, err)) {
+ ioTestMap().remove(id);
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IoTestFuture.class, this);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index b28ced2..b1a26e9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.managers.communication;
import java.io.Externalizable;
import java.nio.ByteBuffer;
import org.apache.ignite.internal.GridDirectTransient;
+import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -321,6 +322,18 @@ public class GridIoMessage implements Message {
return 7;
}
+ /**
+ * Get single partition for this message (if applicable).
+ *
+ * @return Partition ID.
+ */
+ public int partition() {
+ if (msg instanceof GridCacheMessage)
+ return ((GridCacheMessage)msg).partition();
+ else
+ return Integer.MIN_VALUE;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridIoMessage.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 504e683..b1fe910 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -171,6 +171,16 @@ public class GridIoMessageFactory implements MessageFactory {
Message msg = null;
switch (type) {
+ case -44:
+ msg = new TcpCommunicationSpi.HandshakeMessage2();
+
+ break;
+
+ case -43:
+ msg = new IgniteIoTestMessage();
+
+ break;
+
case -42:
msg = new HadoopDirectShuffleMessage();
@@ -816,7 +826,7 @@ public class GridIoMessageFactory implements MessageFactory {
break;
- // [-3..119] [124..127] [-36]- this
+ // [-3..119] [124..127] [-36..-44]- this
// [120..123] - DR
// [-4..-22, -30..-35] - SQL
default:
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
new file mode 100644
index 0000000..77aaa09
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/IgniteIoTestMessage.java
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.managers.communication;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+import java.nio.ByteBuffer;
+
+/**
+ *
+ */
+public class IgniteIoTestMessage implements Message {
+ /** */
+ private static byte FLAG_PROC_FROM_NIO = 1;
+
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private long id;
+
+ /** */
+ private byte flags;
+
+ /** */
+ private boolean req;
+
+ /** */
+ private byte payload[];
+
+ /**
+ *
+ */
+ IgniteIoTestMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param id Message ID.
+ * @param req Request flag.
+ * @param payload Payload.
+ */
+ IgniteIoTestMessage(long id, boolean req, byte[] payload) {
+ this.id = id;
+ this.req = req;
+ this.payload = payload;
+ }
+
+ /**
+ * @return {@code True} if message should be processed from NIO thread
+ * (otherwise message is submitted to system pool).
+ */
+ boolean processFromNioThread() {
+ return isFlag(FLAG_PROC_FROM_NIO);
+ }
+
+ /**
+ * @param procFromNioThread {@code True} if message should be processed from NIO thread.
+ */
+ void processFromNioThread(boolean procFromNioThread) {
+ setFlag(procFromNioThread, FLAG_PROC_FROM_NIO);
+ }
+
+ /**
+ * @param flags Flags.
+ */
+ public void flags(byte flags) {
+ this.flags = flags;
+ }
+
+ /**
+ * @return Flags.
+ */
+ public byte flags() {
+ return flags;
+ }
+
+ /**
+ * Sets flag mask.
+ *
+ * @param flag Set or clear.
+ * @param mask Mask.
+ */
+ private void setFlag(boolean flag, int mask) {
+ flags = flag ? (byte)(flags | mask) : (byte)(flags & ~mask);
+ }
+
+ /**
+ * Reads flag mask.
+ *
+ * @param mask Mask to read.
+ * @return Flag value.
+ */
+ private boolean isFlag(int mask) {
+ return (flags & mask) != 0;
+ }
+
+ /**
+ * @return {@code true} if this is request.
+ */
+ public boolean request() {
+ return req;
+ }
+
+ /**
+ * @return ID.
+ */
+ public long id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("flags", flags))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeLong("id", id))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeByteArray("payload", payload))
+ return false;
+
+ writer.incrementState();
+
+ case 3:
+ if (!writer.writeBoolean("req", req))
+ return false;
+
+ writer.incrementState();
+
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ flags = reader.readByte("flags");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ id = reader.readLong("id");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ payload = reader.readByteArray("payload");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 3:
+ req = reader.readBoolean("req");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ }
+
+ return reader.afterMessageRead(IgniteIoTestMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return -43;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 4;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteIoTestMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 2e24e67..a8d9f1d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -288,6 +288,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** Asynchronous operations limit semaphore. */
private Semaphore asyncOpsSem;
+ /** */
+ protected volatile boolean asyncToggled;
+
/** {@inheritDoc} */
@Override public String name() {
return cacheCfg.getName();
@@ -364,6 +367,18 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
}
/**
+ * Toggles async flag if someone calls {@code withAsync()}
+ * on proxy and since that we have to properly handle all cache
+ * operations (sync and async) to put them in proper sequence.
+ *
+ * TODO: https://issues.apache.org/jira/browse/IGNITE-4393
+ */
+ void toggleAsync() {
+ if (!asyncToggled)
+ asyncToggled = true;
+ }
+
+ /**
* Prints memory stats.
*/
public void printMemoryStats() {
@@ -1134,7 +1149,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
execSvc = Executors.newFixedThreadPool(jobs.size() - 1);
for (int i = 1; i < jobs.size(); i++)
- execSvc.submit(jobs.get(i));
+ execSvc.execute(jobs.get(i));
}
try {
@@ -2534,6 +2549,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Put future.
*/
public IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) {
+ A.notNull(key, "key", val, "val");
+
final boolean statsEnabled = ctx.config().isStatisticsEnabled();
final long start = statsEnabled ? System.nanoTime() : 0L;
@@ -2554,8 +2571,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
*/
public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val,
@Nullable final CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
if (keyCheck)
validateCacheKey(key);
@@ -4592,6 +4607,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* @return Failed future if waiting was interrupted.
*/
@Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire() {
+ if (!asyncToggled)
+ return null;
+
try {
if (asyncOpsSem != null)
asyncOpsSem.acquire();
@@ -4610,7 +4628,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
* Releases asynchronous operations permit, if limited.
*/
protected void asyncOpRelease() {
- if (asyncOpsSem != null)
+ if (asyncOpsSem != null && asyncToggled)
asyncOpsSem.release();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
index 71f99d3..0646d5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMessage.java
@@ -129,6 +129,13 @@ public abstract class GridCacheMessage implements Message {
}
/**
+ * @return Partition ID this message is targeted to or {@code -1} if it cannot be determined.
+ */
+ public int partition() {
+ return -1;
+ }
+
+ /**
* If class loading error occurred during unmarshalling and {@link #ignoreClassErrors()} is
* set to {@code true}, then the error will be passed into this method.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
index 90e428c..3178203 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheUtils.java
@@ -110,6 +110,41 @@ import static org.apache.ignite.internal.processors.cache.GridCacheOperation.REA
* Cache utility methods.
*/
public class GridCacheUtils {
+ /** Cheat cache ID for debugging and benchmarking purposes. */
+ public static final int cheatCacheId;
+
+ /*
+ *
+ */
+ static {
+ String cheatCache = System.getProperty("CHEAT_CACHE");
+
+ if (cheatCache != null) {
+ cheatCacheId = cheatCache.hashCode();
+
+ if (cheatCacheId == 0)
+ throw new RuntimeException();
+
+ System.out.println(">>> Cheat cache ID [id=" + cheatCacheId + ", name=" + cheatCache + ']');
+ }
+ else
+ cheatCacheId = 0;
+ }
+
+ /**
+ * Quickly checks if passed in cache ID is a "cheat cache ID" set by -DCHEAT_CACHE=user_cache_name
+ * and resolved in static block above.
+ *
+ * FOR DEBUGGING AND TESTING PURPOSES!
+ *
+ * @param id Cache ID to check.
+ * @return {@code True} if this is cheat cache ID.
+ */
+ @Deprecated
+ public static boolean cheatCache(int id) {
+ return cheatCacheId != 0 && id == cheatCacheId;
+ }
+
/** Hadoop syste cache name. */
public static final String SYS_CACHE_HADOOP_MR = "ignite-hadoop-mr-sys-cache";
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
index f87fa1d..b9e6e82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheProxy.java
@@ -334,6 +334,14 @@ public class IgniteCacheProxy<K, V> extends AsyncSupportAdapter<IgniteCache<K, V
}
/** {@inheritDoc} */
+ @Override public IgniteCache<K, V> withAsync() {
+ if (delegate instanceof GridCacheAdapter)
+ ((GridCacheAdapter)delegate).toggleAsync();
+
+ return super.withAsync();
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteCache<K, V> withSkipStore() {
return skipStore();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
index 9639a9a..a671296 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedLockRequest.java
@@ -310,6 +310,11 @@ public class GridDistributedLockRequest extends GridDistributedBaseMessage {
return keys;
}
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ }
+
/**
* @return Max lock wait time.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
index 109d665..c5cf332 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxFinishResponse.java
@@ -86,6 +86,12 @@ public class GridDistributedTxFinishResponse extends GridCacheMessage {
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ // TODO https://issues.apache.org/jira/browse/IGNITE-4371
+ return Integer.MIN_VALUE;
+ }
+
+ /** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
index df6acdd..5d70ec1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedUnlockRequest.java
@@ -89,6 +89,11 @@ public class GridDistributedUnlockRequest extends GridDistributedBaseMessage {
partIds.add(key.partition());
}
+ /** {@inheritDoc} */
+ @Override public int partition() {
+ return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ }
+
/** {@inheritDoc}
* @param ctx*/
@Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException {
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 35e6267..519d0fc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -218,7 +218,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap
@Override public void onKernalStart() throws IgniteCheckedException {
super.onKernalStart();
- preldr.onKernalStart();
+ if (preldr != null)
+ preldr.onKernalStart();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
index 1e92b54..63e3309 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockResponse.java
@@ -22,6 +22,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.GridDirectCollection;
@@ -31,7 +32,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -57,7 +57,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
/** Invalid partitions. */
@GridToStringInclude
@GridDirectCollection(int.class)
- private Collection<Integer> invalidParts = new GridLeanSet<>();
+ private Collection<Integer> invalidParts;
/** Preload entries. */
@GridDirectCollection(GridCacheEntryInfo.class)
@@ -127,6 +127,9 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
* @param part Invalid partition.
*/
public void addInvalidPartition(int part) {
+ if (invalidParts == null)
+ invalidParts = new HashSet<>();
+
invalidParts.add(part);
}
@@ -134,7 +137,7 @@ public class GridDhtLockResponse extends GridDistributedLockResponse {
* @return Invalid partitions.
*/
public Collection<Integer> invalidPartitions() {
- return invalidParts;
+ return invalidParts == null ? Collections.<Integer>emptySet() : invalidParts;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 940c74e..0e60ff4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -613,8 +613,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) {
- A.notNull(key, "key", val, "val");
-
return updateAsync0(
key,
val,
@@ -814,6 +812,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> {
*/
@SuppressWarnings("unchecked")
protected <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) {
+ if (!asyncToggled)
+ return op.apply();
+
IgniteInternalFuture<T> fail = asyncOpAcquire();
if (fail != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
index a03d948..0af7cf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicSingleUpdateRequest.java
@@ -235,6 +235,11 @@ public class GridDhtAtomicSingleUpdateRequest extends GridDhtAtomicAbstractUpdat
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return partId;
+ }
+
+ /** {@inheritDoc} */
@Override public int partitionId(int idx) {
assert idx == 0 : idx;
http://git-wip-us.apache.org/repos/asf/ignite/blob/05dd08b9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
index f2fbb0e..1854e52 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java
@@ -487,6 +487,11 @@ public class GridDhtAtomicUpdateRequest extends GridDhtAtomicAbstractUpdateReque
}
/** {@inheritDoc} */
+ @Override public int partition() {
+ return partIds != null && !partIds.isEmpty() ? partIds.get(0) : -1;
+ }
+
+ /** {@inheritDoc} */
@Override public long conflictExpireTime(int idx) {
if (conflictExpireTimes != null) {
assert idx >= 0 && idx < conflictExpireTimes.size();