You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/04/21 11:40:54 UTC
[1/3] ignite git commit: IGNITE-4699: Added custom executors for
compute tasls. This closes #1718.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 0da8c70ad -> d5a2ca218
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
new file mode 100644
index 0000000..18c52c0
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorSelfTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskSplitAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.ExecutorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteReducer;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Tests custom executor named pools.
+ *
+ * https://issues.apache.org/jira/browse/IGNITE-4699
+ */
+public class IgniteComputeCustomExecutorSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final int GRID_CNT = 2;
+
+ /** */
+ private static final String EXEC_NAME0 = "executor_0";
+
+ /** */
+ private static final String EXEC_NAME1 = "executor_1";
+
+ /** */
+ private static final String CACHE_NAME = "testcache";
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ cfg.setExecutorConfiguration(createExecConfiguration(EXEC_NAME0), createExecConfiguration(EXEC_NAME1));
+ cfg.setPublicThreadPoolSize(1);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+ ccfg.setName(CACHE_NAME);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Custom executor name.
+ * @return Executor configuration.
+ */
+ private ExecutorConfiguration createExecConfiguration(String name) {
+ ExecutorConfiguration exec = new ExecutorConfiguration();
+
+ exec.setName(name);
+ exec.setSize(1);
+
+ return exec;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ startGrids(GRID_CNT);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ public void testInvalidCustomExecutor() throws Exception {
+ grid(0).compute().withExecutor("invalid").broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ assertTrue(Thread.currentThread().getName().contains("pub"));
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If fails.
+ */
+ public void testAllComputeApiByCustomExecutor() throws Exception {
+ IgniteCompute comp = grid(0).compute().withExecutor(EXEC_NAME0);
+
+ comp.affinityRun(CACHE_NAME, primaryKey(grid(1).cache(CACHE_NAME)), new IgniteRunnable() {
+ @Override public void run() {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ }
+ });
+
+ comp.affinityCall(CACHE_NAME, 0, new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ });
+
+ comp.broadcast(new IgniteRunnable() {
+ @Override public void run() {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ }
+ });
+
+ comp.broadcast(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ });
+
+ comp.broadcast(new IgniteClosure<Object, Object>() {
+ @Override public Object apply(Object o) {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ }, 0);
+
+ comp.apply(new IgniteClosure<Object, Object>() {
+ @Override public Object apply(Object o) {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ }, 0);
+
+ comp.apply(new IgniteClosure<Integer, Object>() {
+ @Override public Object apply(Integer o) {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ }, Collections.singletonList(0));
+
+ comp.apply(new IgniteClosure<Integer, Object>() {
+ @Override public Object apply(Integer o) {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ }, Collections.singletonList(0),
+ new IgniteReducer<Object, Object>() {
+ @Override public boolean collect(@Nullable Object o) {
+ return true;
+ }
+
+ @Override public Object reduce() {
+ return null;
+ }
+ });
+
+ List<IgniteCallable<Object>> calls = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT * 2; ++i) {
+ calls.add(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ return null;
+ }
+ });
+ }
+
+ comp.call(calls.get(0));
+
+ comp.call(calls);
+
+ comp.call(calls,
+ new IgniteReducer<Object, Object>() {
+ @Override public boolean collect(@Nullable Object o) {
+ return true;
+ }
+
+ @Override public Object reduce() {
+ return null;
+ }
+ });
+
+ List<IgniteRunnable> runs = new ArrayList<>();
+
+ for (int i = 0; i < GRID_CNT * 2; ++i) {
+ runs.add(new IgniteRunnable() {
+ @Override public void run() {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+ }
+ });
+ }
+
+ comp.run(runs.get(0));
+
+ comp.run(runs);
+
+ comp.execute(TestTask.class, null);
+ }
+
+ /**
+ * Test task
+ */
+ static class TestTask extends ComputeTaskSplitAdapter<Object, Object> {
+ /** {@inheritDoc} */
+ @Override protected Collection<? extends ComputeJob> split(int gridSize, Object arg) throws IgniteException {
+ List<ComputeJob> jobs = new ArrayList<>(gridSize * 2);
+
+ for (int i = 0; i < gridSize * 2; ++i) {
+ jobs.add(new ComputeJobAdapter() {
+ @Override public Object execute() throws IgniteException {
+ assertTrue(Thread.currentThread().getName().contains(EXEC_NAME0));
+
+ return null;
+ }
+ });
+ }
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException {
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index ff67b77..b8d1ce9 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -39,7 +39,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
/**
* @param log Logger to use in context config.
*/
- public GridTestKernalContext(IgniteLogger log) throws IgniteCheckedException {
+ public GridTestKernalContext(IgniteLogger log) {
this(log, new IgniteConfiguration());
}
@@ -47,7 +47,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
* @param log Logger to use in context config.
* @param cfg Configuration to use in Test
*/
- public GridTestKernalContext(IgniteLogger log, IgniteConfiguration cfg) throws IgniteCheckedException {
+ public GridTestKernalContext(IgniteLogger log, IgniteConfiguration cfg) {
super(new GridLoggerProxy(log, null, null, null),
new IgniteKernal(null),
cfg,
@@ -67,6 +67,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
U.allPluginProviders()
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
index abd74b3..3f3bc53 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteComputeGridTestSuite.java
@@ -73,6 +73,8 @@ import org.apache.ignite.internal.TaskNodeRestartTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointManagerSelfTest;
import org.apache.ignite.internal.managers.checkpoint.GridCheckpointTaskSelfTest;
import org.apache.ignite.internal.managers.communication.GridCommunicationManagerListenersSelfTest;
+import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorConfigurationSelfTest;
+import org.apache.ignite.internal.processors.compute.IgniteComputeCustomExecutorSelfTest;
import org.apache.ignite.internal.processors.compute.PublicThreadpoolStarvationTest;
import org.apache.ignite.internal.util.StripedExecutorTest;
import org.apache.ignite.p2p.GridMultinodeRedeployContinuousModeSelfTest;
@@ -158,6 +160,9 @@ public class IgniteComputeGridTestSuite {
suite.addTestSuite(PublicThreadpoolStarvationTest.class);
suite.addTestSuite(StripedExecutorTest.class);
+ suite.addTestSuite(IgniteComputeCustomExecutorConfigurationSelfTest.class);
+ suite.addTestSuite(IgniteComputeCustomExecutorSelfTest.class);
+
return suite;
}
}
[2/3] ignite git commit: IGNITE-4699: Added custom executors for
compute tasls. This closes #1718.
Posted by vo...@apache.org.
IGNITE-4699: Added custom executors for compute tasls. This closes #1718.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f871b0d7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f871b0d7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f871b0d7
Branch: refs/heads/ignite-2.0
Commit: f871b0d77084f4ebf7993eccc9cf59767835a41d
Parents: 3eb52a8
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Fri Apr 21 14:40:22 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Apr 21 14:40:22 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCompute.java | 14 ++
.../configuration/ExecutorConfiguration.java | 115 +++++++++
.../configuration/IgniteConfiguration.java | 30 +++
.../ignite/internal/ExecutorAwareMessage.java | 31 +++
.../ignite/internal/GridJobExecuteRequest.java | 32 ++-
.../ignite/internal/GridKernalContext.java | 8 +
.../ignite/internal/GridKernalContextImpl.java | 12 +
.../ignite/internal/GridTaskSessionImpl.java | 15 +-
.../ignite/internal/IgniteComputeImpl.java | 71 ++++--
.../apache/ignite/internal/IgniteKernal.java | 3 +
.../org/apache/ignite/internal/IgnitionEx.java | 66 +++++
.../managers/communication/GridIoManager.java | 23 +-
.../managers/communication/GridIoMessage.java | 13 +
.../closure/GridClosureProcessor.java | 154 +++++++-----
.../processors/job/GridJobProcessor.java | 23 +-
.../internal/processors/job/GridJobWorker.java | 15 +-
.../internal/processors/pool/PoolProcessor.java | 25 ++
.../session/GridTaskSessionProcessor.java | 10 +-
.../processors/task/GridTaskProcessor.java | 69 +++++-
.../processors/task/GridTaskWorker.java | 3 +-
...puteCustomExecutorConfigurationSelfTest.java | 85 +++++++
.../IgniteComputeCustomExecutorSelfTest.java | 245 +++++++++++++++++++
.../junits/GridTestKernalContext.java | 5 +-
.../testsuites/IgniteComputeGridTestSuite.java | 5 +
24 files changed, 970 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
index ad675c0..f0e6039 100644
--- a/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
+++ b/modules/core/src/main/java/org/apache/ignite/IgniteCompute.java
@@ -24,6 +24,8 @@ import org.apache.ignite.compute.ComputeTask;
import org.apache.ignite.compute.ComputeTaskFuture;
import org.apache.ignite.compute.ComputeTaskName;
import org.apache.ignite.compute.ComputeTaskSpis;
+import org.apache.ignite.configuration.ExecutorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.lang.IgniteAsyncSupport;
import org.apache.ignite.lang.IgniteAsyncSupported;
import org.apache.ignite.lang.IgniteCallable;
@@ -751,4 +753,16 @@ public interface IgniteCompute extends IgniteAsyncSupport {
/** {@inheritDoc} */
@Deprecated
@Override public IgniteCompute withAsync();
+
+ /**
+ * Gets instance of the compute API associated with custom executor. All tasks and closures submitted to returned
+ * instance will be processed by this executor on both remote and local nodes. If executor with the given name
+ * doesn't exist, task will be processed in default ("public") pool.
+ * <p>
+ * Executor should be defined in {@link IgniteConfiguration#setExecutorConfiguration(ExecutorConfiguration...)}.
+ *
+ * @param name Custom executor name.
+ * @return Instance of compute API associated with custom executor.
+ */
+ public IgniteCompute withExecutor(@NotNull String name);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
new file mode 100644
index 0000000..8ff7932
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/ExecutorConfiguration.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration;
+
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_PUBLIC_THREAD_CNT;
+
+/**
+ * \u0421ustom thread pool configuration for compute tasks. See {@link IgniteCompute#withAsync()} for more information.
+ */
+public class ExecutorConfiguration {
+ /** Thread pool name. */
+ private String name;
+
+ /** Thread pool size. */
+ private int size = DFLT_PUBLIC_THREAD_CNT;
+
+ /**
+ * Default constructor.
+ */
+ public ExecutorConfiguration() {
+ // No-op.
+ }
+
+ /**
+ * Constructor.
+ *
+ * @param name Thread pool name.
+ */
+ public ExecutorConfiguration(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Copying constructor.
+ *
+ * @param other Instance to copy.
+ */
+ public ExecutorConfiguration(ExecutorConfiguration other) {
+ assert other != null;
+
+ name = other.name;
+ size = other.size;
+ }
+
+ /**
+ * Get thread pool name.
+ * <p>
+ * See {@link #setName(String)} for more information.
+ *
+ * @return Executor name.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Set thread pool name. Name cannot be {@code null} and should be unique with respect to other custom executors.
+ *
+ * @param name Executor name.
+ * @return {@code this} for chaining.
+ */
+ public ExecutorConfiguration setName(String name) {
+ this.name = name;
+
+ return this;
+ }
+
+ /**
+ * Get thread pool size.
+ * <p>
+ * See {@link #setSize(int)} for more information.
+ *
+ * @return Thread pool size.
+ */
+ public int getSize() {
+ return size;
+ }
+
+ /**
+ * Set thread pool size.
+ * <p>
+ * Defaults to {@link IgniteConfiguration#DFLT_PUBLIC_THREAD_CNT}.
+ *
+ * @param size Thread pool size.
+ * @return {@code this} for chaining.
+ */
+ public ExecutorConfiguration setSize(int size) {
+ this.size = size;
+
+ return this;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(ExecutorConfiguration.class, this);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index fe08ddf..17927b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -28,6 +28,7 @@ import javax.cache.integration.CacheLoader;
import javax.cache.processor.EntryProcessor;
import javax.management.MBeanServer;
import javax.net.ssl.SSLContext;
+import org.apache.ignite.IgniteCompute;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.Ignition;
@@ -437,6 +438,9 @@ public class IgniteConfiguration {
/** */
private BinaryConfiguration binaryCfg;
+ /** Custom executor configurations. */
+ private ExecutorConfiguration[] execCfgs;
+
/** */
private boolean lateAffAssignment = DFLT_LATE_AFF_ASSIGNMENT;
@@ -494,6 +498,7 @@ public class IgniteConfiguration {
dataStreamerPoolSize = cfg.getDataStreamerThreadPoolSize();
deployMode = cfg.getDeploymentMode();
discoStartupDelay = cfg.getDiscoveryStartupDelay();
+ execCfgs = cfg.getExecutorConfiguration();
failureDetectionTimeout = cfg.getFailureDetectionTimeout();
hadoopCfg = cfg.getHadoopConfiguration();
igfsCfg = cfg.getFileSystemConfiguration();
@@ -2658,6 +2663,31 @@ public class IgniteConfiguration {
return this;
}
+ /**
+ * Gets custom executors for user compute tasks.
+ * <p>
+ * See {@link #setExecutorConfiguration(ExecutorConfiguration...)} for more information.
+ *
+ * @return Executor configurations.
+ */
+ public ExecutorConfiguration[] getExecutorConfiguration() {
+ return execCfgs;
+ }
+
+ /**
+ * Sets custom executors for user compute tasks.
+ * <p>
+ * See {@link IgniteCompute#withExecutor(String)} for more information.
+ *
+ * @param execCfgs Executor configurations.
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setExecutorConfiguration(ExecutorConfiguration... execCfgs) {
+ this.execCfgs = execCfgs;
+
+ return this;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(IgniteConfiguration.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
new file mode 100644
index 0000000..a8a3b1a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/ExecutorAwareMessage.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Message with specified custom executor must be processed in the appropriate thread pool.
+ */
+public interface ExecutorAwareMessage extends Message {
+ /**
+ * @return Custom executor name. {@code null} In case the custom executor is not provided.
+ */
+ @Nullable public String executorName();
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
index a7e8309..fe2d6d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteRequest.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
@@ -41,7 +40,7 @@ import org.jetbrains.annotations.Nullable;
/**
* Job execution request.
*/
-public class GridJobExecuteRequest implements Message {
+public class GridJobExecuteRequest implements ExecutorAwareMessage {
/** */
private static final long serialVersionUID = 0L;
@@ -146,6 +145,9 @@ public class GridJobExecuteRequest implements Message {
/** */
private AffinityTopologyVersion topVer;
+ /** */
+ private String execName;
+
/**
* No-op constructor to support {@link Externalizable} interface.
*/
@@ -182,6 +184,7 @@ public class GridJobExecuteRequest implements Message {
* @param cacheIds Caches' identifiers to reserve partition.
* @param part Partition to lock.
* @param topVer Affinity topology version of job mapping.
+ * @param execName The name of the custom named executor.
*/
public GridJobExecuteRequest(
IgniteUuid sesId,
@@ -211,7 +214,8 @@ public class GridJobExecuteRequest implements Message {
UUID subjId,
@Nullable int[] cacheIds,
int part,
- @Nullable AffinityTopologyVersion topVer) {
+ @Nullable AffinityTopologyVersion topVer,
+ @Nullable String execName) {
this.top = top;
assert sesId != null;
assert jobId != null;
@@ -251,6 +255,7 @@ public class GridJobExecuteRequest implements Message {
this.idsOfCaches = cacheIds;
this.part = part;
this.topVer = topVer;
+ this.execName = execName;
this.cpSpi = cpSpi == null || cpSpi.isEmpty() ? null : cpSpi;
}
@@ -454,6 +459,11 @@ public class GridJobExecuteRequest implements Message {
return part;
}
+ /** {@inheritDoc} */
+ @Override public String executorName() {
+ return execName;
+ }
+
/**
* @return Affinity version which was used to map job
*/
@@ -622,6 +632,12 @@ public class GridJobExecuteRequest implements Message {
writer.incrementState();
+ case 24:
+ if (!writer.writeString("executorName", execName))
+ return false;
+
+ writer.incrementState();
+
}
return true;
@@ -831,6 +847,14 @@ public class GridJobExecuteRequest implements Message {
reader.incrementState();
+ case 24:
+ execName = reader.readString("executorName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
}
return reader.afterMessageRead(GridJobExecuteRequest.class);
@@ -843,7 +867,7 @@ public class GridJobExecuteRequest implements Message {
/** {@inheritDoc} */
@Override public byte fieldsCount() {
- return 24;
+ return 25;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 8462e5f..010bd21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -563,6 +563,14 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*/
public ExecutorService getQueryExecutorService();
+
+ /**
+ * Executor services that is in charge of processing user compute task.
+ *
+ * @return Map of custom thread pool executors.
+ */
+ @Nullable public Map<String, ? extends ExecutorService> customExecutors();
+
/**
* Executor service that is in charge of processing schema change messages.
*
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index 213cf86..bbc9846 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -344,6 +344,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ Map<String, ? extends ExecutorService> customExecSvcs;
+
+ /** */
+ @GridToStringExclude
private Map<String, Object> attrs = new HashMap<>();
/** */
@@ -401,6 +405,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
+ * @param customExecSvcs Custom named executors.
* @param plugins Plugin providers.
*/
@SuppressWarnings("TypeMayBeWeakened")
@@ -424,6 +429,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
+ @Nullable Map<String, ? extends ExecutorService> customExecSvcs,
List<PluginProvider> plugins
) {
assert grid != null;
@@ -448,6 +454,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.callbackExecSvc = callbackExecSvc;
this.qryExecSvc = qryExecSvc;
this.schemaExecSvc = schemaExecSvc;
+ this.customExecSvcs = customExecSvcs;
marshCtx = new MarshallerContextImpl(plugins);
@@ -998,6 +1005,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ public Map<String, ? extends ExecutorService> customExecutors() {
+ return customExecSvcs;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteExceptionRegistry exceptionRegistry() {
return IgniteExceptionRegistry.get();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
index dd1caa1..458ad36 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTaskSessionImpl.java
@@ -114,6 +114,9 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
/** */
private final IgniteFutureImpl mapFut;
+ /** */
+ private final String execName;
+
/**
* @param taskNodeId Task node ID.
* @param taskName Task name.
@@ -129,6 +132,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
* @param fullSup Session full support enabled flag.
* @param internal Internal task flag.
* @param subjId Subject ID.
+ * @param execName Custom executor name.
*/
public GridTaskSessionImpl(
UUID taskNodeId,
@@ -144,7 +148,8 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
GridKernalContext ctx,
boolean fullSup,
boolean internal,
- UUID subjId) {
+ UUID subjId,
+ @Nullable String execName) {
assert taskNodeId != null;
assert taskName != null;
assert sesId != null;
@@ -173,6 +178,7 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
this.fullSup = fullSup;
this.internal = internal;
this.subjId = subjId;
+ this.execName = execName;
mapFut = new IgniteFutureImpl(new GridFutureAdapter());
}
@@ -873,6 +879,13 @@ public class GridTaskSessionImpl implements GridTaskSessionInternal {
return internal;
}
+ /**
+ * @return Custom executor name.
+ */
+ @Nullable public String executorName() {
+ return execName;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridTaskSessionImpl.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
index 7499a5d..7ddd4ad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComputeImpl.java
@@ -73,6 +73,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** */
private UUID subjId;
+ /** Custom executor name. */
+ private String execName;
+
/**
* Required by {@link Externalizable}.
*/
@@ -103,6 +106,25 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
this.subjId = subjId;
}
+ /**
+ * Constructor.
+ *
+ * @param ctx Kernal context.
+ * @param prj Projection.
+ * @param subjId Subject ID.
+ * @param async Async support flag.
+ * @param execName Custom executor name.
+ */
+ private IgniteComputeImpl(GridKernalContext ctx, ClusterGroupAdapter prj, UUID subjId, boolean async,
+ String execName) {
+ super(async);
+
+ this.ctx = ctx;
+ this.prj = prj;
+ this.subjId = subjId;
+ this.execName = execName;
+ }
+
/** {@inheritDoc} */
@Override protected IgniteCompute createAsyncInstance() {
return new IgniteComputeImpl(ctx, prj, subjId, true);
@@ -152,7 +174,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes());
+ return ctx.closure().affinityRun(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -205,7 +227,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes());
+ return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -248,7 +270,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes());
+ return ctx.closure().affinityRun(cacheNames, partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -298,7 +320,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes());
+ return ctx.closure().affinityCall(Collections.singletonList(cacheName), partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -351,7 +373,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
throw new IgniteCheckedException("Failed map key to partition: [cache=" + cacheName + " key="
+ affKey + ']');
- return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes());
+ return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -394,7 +416,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes());
+ return ctx.closure().affinityCall(cacheNames, partId, job, prj.nodes(), execName);
}
catch (IgniteCheckedException e) {
throw U.convertException(e);
@@ -437,7 +459,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
- return ctx.task().execute(taskName, arg);
+ return ctx.task().execute(taskName, arg, execName);
}
finally {
unguard();
@@ -477,7 +499,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
- return ctx.task().execute(taskCls, arg);
+ return ctx.task().execute(taskCls, arg, execName);
}
finally {
unguard();
@@ -516,7 +538,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
ctx.task().setThreadContextIfNotNull(TC_SUBGRID, prj.nodes());
ctx.task().setThreadContextIfNotNull(TC_SUBJ_ID, subjId);
- return ctx.task().execute(task, arg);
+ return ctx.task().execute(task, arg, execName);
}
finally {
unguard();
@@ -550,7 +572,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().runAsync(BROADCAST, job, prj.nodes());
+ return ctx.closure().runAsync(BROADCAST, job, prj.nodes(), execName);
}
finally {
unguard();
@@ -584,7 +606,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes());
+ return ctx.closure().callAsync(BROADCAST, Collections.singletonList(job), prj.nodes(), execName);
}
finally {
unguard();
@@ -620,7 +642,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().broadcast(job, arg, prj.nodes());
+ return ctx.closure().broadcast(job, arg, prj.nodes(), execName);
}
finally {
unguard();
@@ -654,7 +676,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().runAsync(BALANCE, job, prj.nodes());
+ return ctx.closure().runAsync(BALANCE, job, prj.nodes(), execName);
}
finally {
unguard();
@@ -689,7 +711,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().runAsync(BALANCE, jobs, prj.nodes());
+ return ctx.closure().runAsync(BALANCE, jobs, prj.nodes(), execName);
}
finally {
unguard();
@@ -725,7 +747,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(job, arg, prj.nodes());
+ return ctx.closure().callAsync(job, arg, prj.nodes(), execName);
}
finally {
unguard();
@@ -759,7 +781,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(BALANCE, job, prj.nodes());
+ return ctx.closure().callAsync(BALANCE, job, prj.nodes(), execName);
}
finally {
unguard();
@@ -794,7 +816,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes());
+ return ctx.closure().callAsync(BALANCE, (Collection<? extends Callable<R>>)jobs, prj.nodes(), execName);
}
finally {
unguard();
@@ -832,7 +854,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(job, args, prj.nodes());
+ return ctx.closure().callAsync(job, args, prj.nodes(), execName);
}
finally {
unguard();
@@ -870,7 +892,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes());
+ return ctx.closure().forkjoinAsync(BALANCE, jobs, rdc, prj.nodes(), execName);
}
finally {
unguard();
@@ -911,7 +933,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
guard();
try {
- return ctx.closure().callAsync(job, args, rdc, prj.nodes());
+ return ctx.closure().callAsync(job, args, rdc, prj.nodes(), execName);
}
finally {
unguard();
@@ -1040,11 +1062,13 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeObject(prj);
+ out.writeObject(execName);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
prj = (ClusterGroupAdapter)in.readObject();
+ execName = (String)in.readObject();
}
/**
@@ -1054,7 +1078,7 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
* @throws ObjectStreamException Thrown in case of unmarshalling error.
*/
protected Object readResolve() throws ObjectStreamException {
- return prj.compute();
+ return prj.compute().withExecutor(execName);
}
/** {@inheritDoc} */
@@ -1068,4 +1092,9 @@ public class IgniteComputeImpl extends AsyncSupportAdapter<IgniteCompute>
@Override public <R> ComputeTaskFuture<R> future() {
return (ComputeTaskFuture<R>)super.future();
}
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompute withExecutor(@NotNull String name) {
+ return new IgniteComputeImpl(ctx, prj, subjId, isAsync(), name);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 50f39fa..12a7af6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -699,6 +699,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param callbackExecSvc Callback executor service.
* @param qryExecSvc Query executor service.
* @param schemaExecSvc Schema executor service.
+ * @param customExecSvcs Custom named executors.
* @param errHnd Error handler to use for notification about startup problems.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@@ -720,6 +721,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
IgniteStripedThreadPoolExecutor callbackExecSvc,
ExecutorService qryExecSvc,
ExecutorService schemaExecSvc,
+ Map<String, ? extends ExecutorService> customExecSvcs,
GridAbsClosure errHnd
)
throws IgniteCheckedException
@@ -835,6 +837,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
callbackExecSvc,
qryExecSvc,
schemaExecSvc,
+ customExecSvcs,
plugins
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 2eda01c..4b34891 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -33,6 +33,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -59,6 +60,7 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.ConnectorConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.DeploymentMode;
+import org.apache.ignite.configuration.ExecutorConfiguration;
import org.apache.ignite.configuration.FileSystemConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.TransactionConfiguration;
@@ -1530,6 +1532,9 @@ public class IgnitionEx {
/** Query executor service. */
private ThreadPoolExecutor schemaExecSvc;
+ /** Executor service. */
+ private Map<String, ThreadPoolExecutor> customExecSvcs;
+
/** Grid state. */
private volatile IgniteState state = STOPPED;
@@ -1858,6 +1863,24 @@ public class IgnitionEx {
schemaExecSvc.allowCoreThreadTimeOut(true);
+ if (!F.isEmpty(cfg.getExecutorConfiguration())) {
+ validateCustomExecutorsConfiguration(cfg.getExecutorConfiguration());
+
+ customExecSvcs = new HashMap<>();
+
+ for(ExecutorConfiguration execCfg : cfg.getExecutorConfiguration()) {
+ ThreadPoolExecutor exec = new IgniteThreadPoolExecutor(
+ execCfg.getName(),
+ cfg.getIgniteInstanceName(),
+ execCfg.getSize(),
+ execCfg.getSize(),
+ DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>());
+
+ customExecSvcs.put(execCfg.getName(), exec);
+ }
+ }
+
// Register Ignite MBean for current grid instance.
registerFactoryMbean(myCfg.getMBeanServer());
@@ -1886,6 +1909,7 @@ public class IgnitionEx {
callbackExecSvc,
qryExecSvc,
schemaExecSvc,
+ customExecSvcs,
new CA() {
@Override public void apply() {
startLatch.countDown();
@@ -1962,6 +1986,30 @@ public class IgnitionEx {
}
/**
+ * @param cfgs Array of the executors configurations.
+ * @throws IgniteCheckedException If configuration is wrong.
+ */
+ private static void validateCustomExecutorsConfiguration(ExecutorConfiguration[] cfgs)
+ throws IgniteCheckedException {
+ if (cfgs == null)
+ return;
+
+ Set<String> names = new HashSet<>(cfgs.length);
+
+ for (ExecutorConfiguration cfg : cfgs) {
+ if (F.isEmpty(cfg.getName()))
+ throw new IgniteCheckedException("Custom executor name cannot be null or empty.");
+
+ if (!names.add(cfg.getName()))
+ throw new IgniteCheckedException("Duplicate custom executor name: " + cfg.getName());
+
+ if (cfg.getSize() <= 0)
+ throw new IgniteCheckedException("Custom executor size must be positive [name=" + cfg.getName() +
+ ", size=" + cfg.getSize() + ']');
+ }
+ }
+
+ /**
* @param cfg Ignite configuration copy to.
* @return New ignite configuration.
* @throws IgniteCheckedException If failed.
@@ -2116,6 +2164,17 @@ public class IgnitionEx {
initializeDefaultCacheConfiguration(myCfg);
+ ExecutorConfiguration[] execCfgs = myCfg.getExecutorConfiguration();
+
+ if (execCfgs != null) {
+ ExecutorConfiguration[] clone = execCfgs.clone();
+
+ for (int i = 0; i < execCfgs.length; i++)
+ clone[i] = new ExecutorConfiguration(execCfgs[i]);
+
+ myCfg.setExecutorConfiguration(clone);
+ }
+
if (!myCfg.isClientMode() && myCfg.getMemoryConfiguration() == null) {
MemoryConfiguration memCfg = new MemoryConfiguration();
@@ -2522,6 +2581,13 @@ public class IgnitionEx {
U.shutdownNow(getClass(), callbackExecSvc, log);
callbackExecSvc = null;
+
+ if (!F.isEmpty(customExecSvcs)) {
+ for (ThreadPoolExecutor exec : customExecSvcs.values())
+ U.shutdownNow(getClass(), exec, log);
+
+ customExecSvcs = null;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 83fc3b5..c4f7519 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -65,6 +66,7 @@ import org.apache.ignite.internal.util.lang.GridTuple3;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
@@ -839,12 +841,27 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
}
try {
+ String execName = msg.executorName();
+
+ if (execName != null) {
+ Executor exec = pools.customExecutor(execName);
+
+ if (exec != null) {
+ exec.execute(c);
+
+ return;
+ }
+ else {
+ LT.warn(log, "Custom executor doesn't exist (message will be processed in default " +
+ "thread pool): " + execName);
+ }
+ }
+
pools.poolForPolicy(plc).execute(c);
}
catch (RejectedExecutionException e) {
- U.error(log, "Failed to process regular message due to execution rejection. Increase the upper bound " +
- "on 'ExecutorService' provided by 'IgniteConfiguration.getPublicThreadPoolSize()'. " +
- "Will attempt to process message in the listener thread instead.", e);
+ U.error(log, "Failed to process regular message due to execution rejection. Will attempt to process " +
+ "message in the listener thread instead.", e);
c.run();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
index 2ad4a0b..16eae26 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessage.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.managers.communication;
import java.io.Externalizable;
import java.nio.ByteBuffer;
+
+import org.apache.ignite.internal.ExecutorAwareMessage;
import org.apache.ignite.internal.GridDirectTransient;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+import org.jetbrains.annotations.Nullable;
/**
* Wrapper for all grid messages.
@@ -334,6 +337,16 @@ public class GridIoMessage implements Message {
return Integer.MIN_VALUE;
}
+ /**
+ * @return Executor name (if available).
+ */
+ @Nullable public String executorName() {
+ if (msg instanceof ExecutorAwareMessage)
+ return ((ExecutorAwareMessage)msg).executorName();
+
+ return null;
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridIoMessage.class, this);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index f91ee34..1051807 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -146,11 +146,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param mode Distribution mode.
* @param jobs Closures to execute.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, @Nullable Collection<? extends Runnable> jobs,
- @Nullable Collection<ClusterNode> nodes) {
- return runAsync(mode, jobs, nodes, false);
+ @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+ return runAsync(mode, jobs, nodes, false, execName);
}
/**
@@ -158,12 +159,14 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
+ * @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Collection<? extends Runnable> jobs,
@Nullable Collection<ClusterNode> nodes,
- boolean sys)
+ boolean sys,
+ @Nullable String execName)
{
assert mode != null;
assert !F.isEmpty(jobs) : jobs;
@@ -181,7 +184,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T1(mode, jobs), null, sys);
+ return ctx.task().execute(new T1(mode, jobs), null, sys, execName);
}
finally {
busyLock.readUnlock();
@@ -196,7 +199,19 @@ public class GridClosureProcessor extends GridProcessorAdapter {
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
@Nullable Collection<ClusterNode> nodes) {
- return runAsync(mode, job, nodes, false);
+ return runAsync(mode, job, nodes, null);
+ }
+
+ /**
+ * @param mode Distribution mode.
+ * @param job Closure to execute.
+ * @param nodes Grid nodes.
+ * @param execName Custom executor name.
+ * @return Task execution future.
+ */
+ public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode, Runnable job,
+ @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
+ return runAsync(mode, job, nodes, false, execName);
}
/**
@@ -204,12 +219,14 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
+ * @param execName Custom executor name.
* @return Task execution future.
*/
public ComputeTaskInternalFuture<?> runAsync(GridClosureCallMode mode,
Runnable job,
@Nullable Collection<ClusterNode> nodes,
- boolean sys)
+ boolean sys,
+ @Nullable String execName)
{
assert mode != null;
assert job != null;
@@ -222,7 +239,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T2(mode, job), null, sys);
+ return ctx.task().execute(new T2(mode, job), null, sys, execName);
}
finally {
busyLock.readUnlock();
@@ -341,6 +358,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param jobs Closures to execute.
* @param rdc Reducer.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @param <R1> Type.
* @param <R2> Type.
* @return Reduced result.
@@ -348,7 +366,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
public <R1, R2> ComputeTaskInternalFuture<R2> forkjoinAsync(GridClosureCallMode mode,
Collection<? extends Callable<R1>> jobs,
IgniteReducer<R1, R2> rdc,
- @Nullable Collection<ClusterNode> nodes)
+ @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName)
{
assert mode != null;
assert rdc != null;
@@ -362,7 +381,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T3<>(mode, jobs, rdc), null);
+ return ctx.task().execute(new T3<>(mode, jobs, rdc), null, execName);
}
finally {
busyLock.readUnlock();
@@ -380,7 +399,23 @@ public class GridClosureProcessor extends GridProcessorAdapter {
GridClosureCallMode mode,
@Nullable Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes) {
- return callAsync(mode, jobs, nodes, false);
+ return callAsync(mode, jobs, nodes, null);
+ }
+
+ /**
+ * @param mode Distribution mode.
+ * @param jobs Closures to execute.
+ * @param nodes Grid nodes.
+ * @param execName Custom executor name.
+ * @param <R> Type.
+ * @return Grid future for collection of closure results.
+ */
+ public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(
+ GridClosureCallMode mode,
+ @Nullable Collection<? extends Callable<R>> jobs,
+ @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName) {
+ return callAsync(mode, jobs, nodes, false, execName);
}
/**
@@ -388,13 +423,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param jobs Closures to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
+ * @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<Collection<R>> callAsync(GridClosureCallMode mode,
Collection<? extends Callable<R>> jobs,
@Nullable Collection<ClusterNode> nodes,
- boolean sys)
+ boolean sys,
+ @Nullable String execName)
{
assert mode != null;
assert !F.isEmpty(jobs);
@@ -407,7 +444,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T6<>(mode, jobs), null, sys);
+ return ctx.task().execute(new T6<>(mode, jobs), null, sys, execName);
}
finally {
busyLock.readUnlock();
@@ -415,7 +452,6 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/**
- *
* @param mode Distribution mode.
* @param job Closure to execute.
* @param nodes Grid nodes.
@@ -424,7 +460,21 @@ public class GridClosureProcessor extends GridProcessorAdapter {
*/
public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
@Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes) {
- return callAsync(mode, job, nodes, false);
+ return callAsync(mode, job, nodes, null);
+ }
+
+ /**
+ * @param mode Distribution mode.
+ * @param job Closure to execute.
+ * @param nodes Grid nodes.
+ * @param execName Custom executor name.
+ * @param <R> Type.
+ * @return Grid future for collection of closure results.
+ */
+ public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
+ @Nullable Callable<R> job, @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName) {
+ return callAsync(mode, job, nodes, false, execName);
}
/**
@@ -432,13 +482,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param partId Partition.
* @param job Closure to execute.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Grid future for collection of closure results.
* @throws IgniteCheckedException If failed.
*/
public <R> ComputeTaskInternalFuture<R> affinityCall(@NotNull Collection<String> cacheNames,
int partId,
Callable<R> job,
- @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+ @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
@@ -457,7 +509,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null, false);
+ return ctx.task().execute(new T5(node, job, cacheNames, partId, mapTopVer), null,
+ false, execName);
}
finally {
busyLock.readUnlock();
@@ -469,13 +522,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param partId Partition.
* @param job Job.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Job future.
* @throws IgniteCheckedException If failed.
*/
public ComputeTaskInternalFuture<?> affinityRun(@NotNull Collection<String> cacheNames,
int partId,
Runnable job,
- @Nullable Collection<ClusterNode> nodes) throws IgniteCheckedException {
+ @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName) throws IgniteCheckedException {
assert partId >= 0 : partId;
busyLock.readLock();
@@ -494,7 +549,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null, false);
+ return ctx.task().execute(new T4(node, job, cacheNames, partId, mapTopVer), null,
+ false, execName);
}
finally {
busyLock.readUnlock();
@@ -588,13 +644,15 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param job Closure to execute.
* @param nodes Grid nodes.
* @param sys If {@code true}, then system pool will be used.
+ * @param execName Custom executor name.
* @param <R> Type.
* @return Grid future for collection of closure results.
*/
public <R> ComputeTaskInternalFuture<R> callAsync(GridClosureCallMode mode,
Callable<R> job,
@Nullable Collection<ClusterNode> nodes,
- boolean sys)
+ boolean sys,
+ @Nullable String execName)
{
assert mode != null;
assert job != null;
@@ -607,7 +665,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T7<>(mode, job), null, sys);
+ return ctx.task().execute(new T7<>(mode, job), null, sys, execName);
}
finally {
busyLock.readUnlock();
@@ -618,10 +676,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param job Job closure.
* @param arg Optional job argument.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> ComputeTaskInternalFuture<R> callAsync(IgniteClosure<T, R> job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes) {
+ @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
busyLock.readLock();
try {
@@ -630,7 +689,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T8(job, arg), null, false);
+ return ctx.task().execute(new T8(job, arg), null, false, execName);
}
finally {
busyLock.readUnlock();
@@ -641,33 +700,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param job Job closure.
* @param arg Optional job argument.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> IgniteInternalFuture<Collection<R>> broadcast(IgniteClosure<T, R> job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes) {
- busyLock.readLock();
-
- try {
- if (F.isEmpty(nodes))
- return new GridFinishedFuture<>(U.emptyTopologyException());
-
- ctx.task().setThreadContext(TC_SUBGRID, nodes);
-
- return ctx.task().execute(new T11<>(job), arg, false);
- }
- finally {
- busyLock.readUnlock();
- }
- }
-
- /**
- * @param job Job closure.
- * @param arg Optional job argument.
- * @param nodes Grid nodes.
- * @return Grid future for execution result.
- */
- public <T, R> IgniteInternalFuture<Collection<R>> broadcastNoFailover(IgniteClosure<T, R> job, @Nullable T arg,
- @Nullable Collection<ClusterNode> nodes) {
+ @Nullable Collection<ClusterNode> nodes, @Nullable String execName) {
busyLock.readLock();
try {
@@ -675,9 +712,8 @@ public class GridClosureProcessor extends GridProcessorAdapter {
return new GridFinishedFuture<>(U.emptyTopologyException());
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- ctx.task().setThreadContext(TC_NO_FAILOVER, true);
- return ctx.task().execute(new T11<>(job), arg, false);
+ return ctx.task().execute(new T11<>(job), arg, false, execName);
}
finally {
busyLock.readUnlock();
@@ -688,11 +724,13 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param job Job closure.
* @param args Job arguments.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R> ComputeTaskInternalFuture<Collection<R>> callAsync(IgniteClosure<T, R> job,
@Nullable Collection<? extends T> args,
- @Nullable Collection<ClusterNode> nodes)
+ @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName)
{
busyLock.readLock();
@@ -702,7 +740,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T9<>(job, args), null, false);
+ return ctx.task().execute(new T9<>(job, args), null, false, execName);
}
finally {
busyLock.readUnlock();
@@ -714,10 +752,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @param args Job arguments.
* @param rdc Reducer.
* @param nodes Grid nodes.
+ * @param execName Custom executor name.
* @return Grid future for execution result.
*/
public <T, R1, R2> ComputeTaskInternalFuture<R2> callAsync(IgniteClosure<T, R1> job,
- Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes) {
+ Collection<? extends T> args, IgniteReducer<R1, R2> rdc, @Nullable Collection<ClusterNode> nodes,
+ @Nullable String execName) {
busyLock.readLock();
try {
@@ -726,7 +766,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
ctx.task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.task().execute(new T10<>(job, args, rdc), null, false);
+ return ctx.task().execute(new T10<>(job, args, rdc), null, false, execName);
}
finally {
busyLock.readUnlock();
@@ -1122,7 +1162,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection)}.
+ * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Collection, Collection,String)}.
*/
private class T1 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
/** */
@@ -1156,7 +1196,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection)}.
+ * {@link GridClosureProcessor#runAsync(GridClosureCallMode, Runnable, Collection, String)}.
*/
private class T2 extends TaskNoReduceAdapter<Void> implements GridNoImplicitInjection {
/** */
@@ -1187,7 +1227,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection)}
+ * {@link GridClosureProcessor#forkjoinAsync(GridClosureCallMode, Collection, org.apache.ignite.lang.IgniteReducer, Collection, String)}
*/
private class T3<R1, R2> extends GridPeerDeployAwareTaskAdapter<Void, R2> implements GridNoImplicitInjection {
/** */
@@ -1378,7 +1418,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection)}
+ * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Collection, Collection, String)}
*/
private class T6<R> extends GridPeerDeployAwareTaskAdapter<Void, Collection<R>> implements GridNoImplicitInjection {
/** */
@@ -1421,7 +1461,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
* Task that is free of dragged in enclosing context for the method
- * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection)}
+ * {@link GridClosureProcessor#callAsync(GridClosureCallMode, Callable, Collection, String)}
*/
private class T7<R> extends GridPeerDeployAwareTaskAdapter<Void, R> implements GridNoImplicitInjection {
/** */
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 91ec8a9..369ca22 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -27,6 +27,7 @@ import java.util.Map;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -73,6 +74,7 @@ import org.apache.ignite.internal.util.GridSpinReadWriteLock;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -1058,7 +1060,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
sesAttrs,
req.isSessionFullSupport(),
req.isInternal(),
- req.getSubjectId());
+ req.getSubjectId(),
+ req.executorName());
taskSes.setCheckpointSpi(req.getCheckpointSpi());
taskSes.setClassLoader(dep.classLoader());
@@ -1098,7 +1101,8 @@ public class GridJobProcessor extends GridProcessorAdapter {
evtLsnr,
holdLsnr,
partsReservation,
- req.getTopVer());
+ req.getTopVer(),
+ req.executorName());
jobCtx.job(job);
@@ -1274,7 +1278,20 @@ public class GridJobProcessor extends GridProcessorAdapter {
*/
private boolean executeAsync(GridJobWorker jobWorker) {
try {
- ctx.getExecutorService().execute(jobWorker);
+ if (jobWorker.executorName() != null) {
+ Executor customExec = ctx.pools().customExecutor(jobWorker.executorName());
+
+ if (customExec != null)
+ customExec.execute(jobWorker);
+ else {
+ LT.warn(log, "Custom executor doesn't exist (local job will be processed in default " +
+ "thread pool): " + jobWorker.executorName());
+
+ ctx.getExecutorService().execute(jobWorker);
+ }
+ }
+ else
+ ctx.getExecutorService().execute(jobWorker);
if (metricsUpdateFreq > -1L)
startedJobsCnt.increment();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 5c9b9e2..c9129c1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -168,6 +168,9 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
/** Request topology version. */
private final AffinityTopologyVersion reqTopVer;
+ /** Request topology version. */
+ private final String execName;
+
/**
* @param ctx Kernal context.
* @param dep Grid deployment.
@@ -182,6 +185,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
* @param holdLsnr Hold listener.
* @param partsReservation Reserved partitions (must be released at the job finish).
* @param reqTopVer Affinity topology version of the job request.
+ * @param execName Custom executor name.
*/
GridJobWorker(
GridKernalContext ctx,
@@ -196,7 +200,8 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
GridJobEventListener evtLsnr,
GridJobHoldListener holdLsnr,
GridReservable partsReservation,
- AffinityTopologyVersion reqTopVer) {
+ AffinityTopologyVersion reqTopVer,
+ String execName) {
super(ctx.igniteInstanceName(), "grid-job-worker", ctx.log(GridJobWorker.class));
assert ctx != null;
@@ -219,6 +224,7 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
this.holdLsnr = holdLsnr;
this.partsReservation = partsReservation;
this.reqTopVer = reqTopVer;
+ this.execName = execName;
if (job != null)
this.job = job;
@@ -727,6 +733,13 @@ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
}
/**
+ * @return Custom executor name.
+ */
+ public String executorName() {
+ return execName;
+ }
+
+ /**
* @param evtType Event type.
* @param msg Message.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 37bbb54..221c7bb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.processors.pool;
import java.util.Arrays;
+import java.util.Map;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.GridKernalContext;
@@ -26,6 +28,7 @@ import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
import org.apache.ignite.plugin.extensions.communication.IoPool;
+import org.jetbrains.annotations.Nullable;
/**
* Processor which abstracts out thread pool management.
@@ -34,6 +37,9 @@ public class PoolProcessor extends GridProcessorAdapter {
/** Map of {@link IoPool}-s injected by Ignite plugins. */
private final IoPool[] extPools = new IoPool[128];
+ /** Custom named pools. */
+ private final Map<String, ? extends ExecutorService> customExecs;
+
/**
* Constructor.
*
@@ -72,6 +78,8 @@ public class PoolProcessor extends GridProcessorAdapter {
}
}
}
+
+ customExecs = ctx.customExecutors();
}
/** {@inheritDoc} */
@@ -165,4 +173,21 @@ public class PoolProcessor extends GridProcessorAdapter {
}
}
}
+
+ /**
+ * Gets executor service for custom policy by executor name.
+ *
+ * @param name Executor name.
+ * @return Executor service.
+ */
+ @Nullable public Executor customExecutor(String name) {
+ assert name != null;
+
+ Executor exec = null;
+
+ if (customExecs != null)
+ exec = customExecs.get(name);
+
+ return exec;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
index 9af038a..f9937a6 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/session/GridTaskSessionProcessor.java
@@ -76,6 +76,7 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
* @param fullSup {@code True} to enable distributed session attributes and checkpoints.
* @param internal {@code True} in case of internal task.
* @param subjId Subject ID.
+ * @param execName Custom executor name.
* @return New session if one did not exist, or existing one.
*/
public GridTaskSessionImpl createTaskSession(
@@ -91,7 +92,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
Map<Object, Object> attrs,
boolean fullSup,
boolean internal,
- UUID subjId) {
+ UUID subjId,
+ @Nullable String execName) {
if (!fullSup) {
return new GridTaskSessionImpl(
taskNodeId,
@@ -107,7 +109,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
ctx,
false,
internal,
- subjId);
+ subjId,
+ execName);
}
while (true) {
@@ -130,7 +133,8 @@ public class GridTaskSessionProcessor extends GridProcessorAdapter {
ctx,
true,
internal,
- subjId));
+ subjId,
+ execName));
if (old != null)
ses = old;
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index d34f297..22d5716 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@ -359,6 +359,19 @@ public class GridTaskProcessor extends GridProcessorAdapter {
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) {
+ return execute(taskCls, arg, null);
+ }
+
+ /**
+ * @param taskCls Task class.
+ * @param arg Optional execution argument.
+ * @param execName Name of the custom executor.
+ * @return Task future.
+ * @param <T> Task argument type.
+ * @param <R> Task return value type.
+ */
+ public <T, R> ComputeTaskInternalFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg,
+ @Nullable String execName) {
assert taskCls != null;
lock.readLock();
@@ -367,7 +380,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls);
- return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+ return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+ false, execName);
}
finally {
lock.readUnlock();
@@ -382,7 +396,19 @@ public class GridTaskProcessor extends GridProcessorAdapter {
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) {
- return execute(task, arg, false);
+ return execute(task, arg, false, null);
+ }
+
+ /**
+ * @param task Actual task.
+ * @param arg Optional task argument.
+ * @param execName Name of the custom executor.
+ * @return Task future.
+ * @param <T> Task argument type.
+ * @param <R> Task return value type.
+ */
+ public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, String execName) {
+ return execute(task, arg, false, execName);
}
/**
@@ -394,13 +420,28 @@ public class GridTaskProcessor extends GridProcessorAdapter {
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) {
+ return execute(task, arg, sys, null);
+ }
+
+ /**
+ * @param task Actual task.
+ * @param arg Optional task argument.
+ * @param sys If {@code true}, then system pool will be used.
+ * @param execName Name of the custom executor.
+ * @return Task future.
+ * @param <T> Task argument type.
+ * @param <R> Task return value type.
+ */
+ public <T, R> ComputeTaskInternalFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys,
+ @Nullable String execName) {
lock.readLock();
try {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task);
- return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, sys);
+ return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+ sys, execName);
}
finally {
lock.readUnlock();
@@ -436,6 +477,18 @@ public class GridTaskProcessor extends GridProcessorAdapter {
* @param <R> Task return value type.
*/
public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg) {
+ return execute(taskName, arg, null);
+ }
+
+ /**
+ * @param taskName Task name.
+ * @param arg Optional execution argument.
+ * @param execName Name of the custom executor.
+ * @return Task future.
+ * @param <T> Task argument type.
+ * @param <R> Task return value type.
+ */
+ public <T, R> ComputeTaskInternalFuture<R> execute(String taskName, @Nullable T arg, @Nullable String execName) {
assert taskName != null;
lock.readLock();
@@ -444,7 +497,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
if (stopping)
throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName);
- return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+ return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg,
+ false, execName);
}
finally {
lock.readUnlock();
@@ -458,6 +512,7 @@ public class GridTaskProcessor extends GridProcessorAdapter {
* @param sesId Task session ID.
* @param arg Optional task argument.
* @param sys If {@code true}, then system pool will be used.
+ * @param execName Name of the custom executor.
* @return Task future.
*/
@SuppressWarnings("unchecked")
@@ -467,7 +522,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
@Nullable ComputeTask<T, R> task,
IgniteUuid sesId,
@Nullable T arg,
- boolean sys) {
+ boolean sys,
+ @Nullable String execName) {
assert sesId != null;
String taskClsName;
@@ -629,7 +685,8 @@ public class GridTaskProcessor extends GridProcessorAdapter {
Collections.emptyMap(),
fullSup,
internal,
- subjId);
+ subjId,
+ execName);
ComputeTaskInternalFuture<R> fut = new ComputeTaskInternalFuture<>(ses, ctx);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index cb5aabe..62224f0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@ -1372,7 +1372,8 @@ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
subjId,
affCacheIds,
affPartId,
- mapTopVer);
+ mapTopVer,
+ ses.executorName());
if (loc)
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
http://git-wip-us.apache.org/repos/asf/ignite/blob/f871b0d7/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
new file mode 100644
index 0000000..2277100
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/compute/IgniteComputeCustomExecutorConfigurationSelfTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.compute;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.configuration.ExecutorConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+/**
+ * Tests custom executor configuration.
+ */
+public class IgniteComputeCustomExecutorConfigurationSelfTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(ipFinder);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testConfigurations() throws Exception {
+ try {
+ checkStartWithInvalidConfiguration(getConfiguration("node0")
+ .setExecutorConfiguration(new ExecutorConfiguration()));
+
+ checkStartWithInvalidConfiguration(getConfiguration("node0")
+ .setExecutorConfiguration(new ExecutorConfiguration("")));
+
+ checkStartWithInvalidConfiguration(getConfiguration("node0")
+ .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(-1)));
+
+ checkStartWithInvalidConfiguration(getConfiguration("node0")
+ .setExecutorConfiguration(new ExecutorConfiguration("exec").setSize(0)));
+ }
+ finally {
+ Ignition.stopAll(true);
+ }
+ }
+
+ /**
+ * @param cfg Ignite configuration.
+ * @throws Exception If failed.
+ */
+ private void checkStartWithInvalidConfiguration(IgniteConfiguration cfg) throws Exception {
+ try {
+ Ignition.start(cfg);
+
+ fail("Node start must fail.");
+ }
+ catch (IgniteException e) {
+ // No-op.
+ }
+ }
+}
[3/3] ignite git commit: Merge remote-tracking branch
'origin/ignite-2.0' into ignite-2.0
Posted by vo...@apache.org.
Merge remote-tracking branch 'origin/ignite-2.0' into ignite-2.0
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d5a2ca21
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d5a2ca21
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d5a2ca21
Branch: refs/heads/ignite-2.0
Commit: d5a2ca218e696b51e317ca0cb1b32289df881f9c
Parents: f871b0d 0da8c70
Author: devozerov <vo...@gridgain.com>
Authored: Fri Apr 21 14:40:46 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Apr 21 14:40:46 2017 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/IgniteCache.java | 11 -
.../org/apache/ignite/events/CacheEvent.java | 3 -
.../java/org/apache/ignite/events/Event.java | 1 -
.../org/apache/ignite/events/EventType.java | 109 -----
.../apache/ignite/events/SwapSpaceEvent.java | 105 ----
.../managers/indexing/GridIndexingManager.java | 44 --
.../processors/cache/CacheMetricsImpl.java | 21 -
.../processors/cache/GridCacheContext.java | 23 -
.../GridCacheEntryInfoCollectSwapListener.java | 70 ---
.../processors/cache/GridCacheSwapListener.java | 33 --
.../processors/cache/IgniteCacheProxy.java | 6 -
.../cache/query/GridCacheQueryManager.java | 59 ---
.../platform/PlatformContextImpl.java | 11 -
.../platform/cache/PlatformCache.java | 9 -
.../processors/query/GridQueryIndexing.java | 19 -
.../processors/query/GridQueryProcessor.java | 55 ---
.../apache/ignite/spi/indexing/IndexingSpi.java | 19 -
.../spi/indexing/noop/NoopIndexingSpi.java | 10 -
.../resources/META-INF/classnames.properties | 1 -
.../cache/GridCacheAbstractFullApiSelfTest.java | 45 --
.../IgniteCacheConfigVariationsFullApiTest.java | 45 --
.../IgniteCacheStoreValueAbstractTest.java | 4 -
.../IgniteTxExceptionAbstractSelfTest.java | 10 -
...tractDistributedByteArrayValuesSelfTest.java | 43 --
.../atomic/IgniteCacheAtomicProtocolTest.java | 175 ++++++-
...tomicClientOnlyMultiNodeFullApiSelfTest.java | 51 +-
...achePartitionedMultiNodeFullApiSelfTest.java | 59 ---
...idCacheReplicatedUnswapAdvancedSelfTest.java | 151 ------
.../cache/query/IndexingSpiQuerySelfTest.java | 22 -
.../cache/query/IndexingSpiQueryTxSelfTest.java | 10 -
.../loadtests/cache/GridCacheSwapLoadTest.java | 320 -------------
.../multijvm/IgniteCacheProcessProxy.java | 5 -
.../testsuites/IgniteCacheTestSuite3.java | 2 -
.../processors/query/h2/IgniteH2Indexing.java | 67 ---
.../query/h2/opt/GridH2AbstractKeyValueRow.java | 89 +---
.../query/h2/opt/GridH2KeyValueRowOffheap.java | 70 ---
.../query/h2/opt/GridH2RowDescriptor.java | 11 -
.../processors/query/h2/opt/GridH2Table.java | 87 +---
...ryDuplicateIndexObjectsAbstractSelfTest.java | 159 -------
.../cache/GridCacheOffHeapSelfTest.java | 476 -------------------
.../IgniteCacheQueryMultiThreadedSelfTest.java | 25 -
...ateIndexObjectPartitionedAtomicSelfTest.java | 38 --
...xObjectPartitionedTransactionalSelfTest.java | 41 --
.../query/IgniteQueryDedicatedPoolTest.java | 10 -
.../IgniteBinaryCacheQueryTestSuite.java | 5 -
.../stream/kafka/connect/IgniteSourceTask.java | 4 -
.../Cache/CacheAbstractTest.cs | 58 ---
.../Cache/CacheTestAsyncWrapper.cs | 6 -
.../dotnet/Apache.Ignite.Core/Cache/ICache.cs | 6 -
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 8 -
.../Apache.Ignite.Core/Impl/Cache/CacheOp.cs | 1 -
51 files changed, 159 insertions(+), 2553 deletions(-)
----------------------------------------------------------------------