You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2017/02/06 14:57:03 UTC
[1/3] ignite git commit: IGNITE-4105: Added separate thread pool for
queries. This closes #1469.
Repository: ignite
Updated Branches:
refs/heads/ignite-4475-async 055295edd -> dd4d43908
IGNITE-4105: Added separate thread pool for queries. This closes #1469.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/aa46bc7c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/aa46bc7c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/aa46bc7c
Branch: refs/heads/ignite-4475-async
Commit: aa46bc7c82e7f172bcceed4afa4b010bf4071cda
Parents: 739c606
Author: Sergey Kalashnikov <sk...@gridgain.com>
Authored: Fri Feb 3 12:09:27 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Fri Feb 3 12:09:27 2017 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 31 ++++++
.../ignite/internal/GridKernalContext.java | 7 ++
.../ignite/internal/GridKernalContextImpl.java | 13 +++
.../apache/ignite/internal/IgniteKernal.java | 17 ++-
.../org/apache/ignite/internal/IgnitionEx.java | 20 ++++
.../managers/communication/GridIoManager.java | 4 +-
.../managers/communication/GridIoPolicy.java | 3 +
.../internal/processors/pool/PoolProcessor.java | 5 +
.../junits/GridTestKernalContext.java | 1 +
.../query/h2/twostep/GridMapQueryExecutor.java | 8 +-
.../h2/twostep/GridReduceQueryExecutor.java | 7 +-
.../query/IgniteSqlQueryDedicatedPoolTest.java | 110 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
13 files changed, 215 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
index 512ceee..f35742b 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/IgniteConfiguration.java
@@ -169,6 +169,9 @@ public class IgniteConfiguration {
@Deprecated
public static final int DFLT_SYSTEM_MAX_THREAD_CNT = DFLT_PUBLIC_THREAD_CNT;
+ /** Default size of query thread pool. */
+ public static final int DFLT_QUERY_THREAD_POOL_SIZE = DFLT_PUBLIC_THREAD_CNT;
+
/** Default keep alive time for system thread pool. */
@Deprecated
public static final long DFLT_SYSTEM_KEEP_ALIVE_TIME = 0;
@@ -266,6 +269,9 @@ public class IgniteConfiguration {
/** P2P pool size. */
private int p2pPoolSize = DFLT_P2P_THREAD_CNT;
+ /** Query pool size. */
+ private int qryPoolSize = DFLT_QUERY_THREAD_POOL_SIZE;
+
/** Ignite installation folder. */
private String igniteHome;
@@ -549,6 +555,7 @@ public class IgniteConfiguration {
platformCfg = cfg.getPlatformConfiguration();
pluginCfgs = cfg.getPluginConfigurations();
pubPoolSize = cfg.getPublicThreadPoolSize();
+ qryPoolSize = cfg.getQueryThreadPoolSize();
rebalanceThreadPoolSize = cfg.getRebalanceThreadPoolSize();
segChkFreq = cfg.getSegmentCheckFrequency();
segPlc = cfg.getSegmentationPolicy();
@@ -869,6 +876,17 @@ public class IgniteConfiguration {
}
/**
+ * Size of thread pool that is in charge of processing query messages.
+ * <p>
+ * If not provided, executor service will have size {@link #DFLT_QUERY_THREAD_POOL_SIZE}.
+ *
+ * @return Thread pool size to be used in grid for query messages.
+ */
+ public int getQueryThreadPoolSize() {
+ return qryPoolSize;
+ }
+
+ /**
* Sets thread pool size to use within grid.
*
* @param poolSize Thread pool size to use within grid.
@@ -975,6 +993,19 @@ public class IgniteConfiguration {
}
/**
+ * Sets query thread pool size to use within grid.
+ *
+ * @param poolSize Thread pool size to use within grid.
+ * @see IgniteConfiguration#getQueryThreadPoolSize()
+ * @return {@code this} for chaining.
+ */
+ public IgniteConfiguration setQueryThreadPoolSize(int poolSize) {
+ qryPoolSize = poolSize;
+
+ return this;
+ }
+
+ /**
* Sets keep alive time of thread pool size that will be used to process utility cache messages.
*
* @param keepAliveTime Keep alive time of executor service to use for utility cache messages.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index 177062d..8e3dbe1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -573,6 +573,13 @@ public interface GridKernalContext extends Iterable<GridComponent> {
@Nullable public ExecutorService getIndexingExecutorService();
/**
+ * Executor service that is in charge of processing query messages.
+ *
+ * @return Thread pool implementation to be used in grid for query messages.
+ */
+ public ExecutorService getQueryExecutorService();
+
+ /**
* Gets exception registry.
*
* @return Exception registry.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index a45be59..d8075d3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -343,6 +343,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
+ protected ExecutorService qryExecSvc;
+
+ /** */
+ @GridToStringExclude
private Map<String, Object> attrs = new HashMap<>();
/** */
@@ -400,6 +404,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
* @param restExecSvc REST executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
+ * @param callbackExecSvc Callback executor service.
+ * @param qryExecSvc Query executor service.
* @param plugins Plugin providers.
* @throws IgniteCheckedException In case of error.
*/
@@ -421,6 +427,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
+ ExecutorService qryExecSvc,
List<PluginProvider> plugins
) {
assert grid != null;
@@ -442,6 +449,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
this.affExecSvc = affExecSvc;
this.idxExecSvc = idxExecSvc;
this.callbackExecSvc = callbackExecSvc;
+ this.qryExecSvc = qryExecSvc;
marshCtx = new MarshallerContextImpl(plugins);
@@ -1005,6 +1013,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
+ @Override public ExecutorService getQueryExecutorService() {
+ return qryExecSvc;
+ }
+
+ /** {@inheritDoc} */
@Override public IgniteExceptionRegistry exceptionRegistry() {
return IgniteExceptionRegistry.get();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index 63041a3..1f7f0d8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -295,6 +295,10 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
@GridToStringExclude
private ObjectName restExecSvcMBean;
+ /** */
+ @GridToStringExclude
+ private ObjectName qryExecSvcMBean;
+
/** Kernal start timestamp. */
private long startTime = U.currentTimeMillis();
@@ -674,6 +678,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
* @param restExecSvc Reset executor service.
* @param affExecSvc Affinity executor service.
* @param idxExecSvc Indexing executor service.
+ * @param callbackExecSvc Callback executor service.
+ * @param qryExecSvc Query executor service.
* @param errHnd Error handler to use for notification about startup problems.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
@@ -692,6 +698,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService affExecSvc,
@Nullable ExecutorService idxExecSvc,
IgniteStripedThreadPoolExecutor callbackExecSvc,
+ ExecutorService qryExecSvc,
GridAbsClosure errHnd
)
throws IgniteCheckedException
@@ -801,6 +808,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
affExecSvc,
idxExecSvc,
callbackExecSvc,
+ qryExecSvc,
plugins
);
@@ -961,7 +969,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
// Register MBeans.
registerKernalMBean();
registerLocalNodeMBean();
- registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc);
+ registerExecutorMBeans(execSvc, sysExecSvc, p2pExecSvc, mgmtExecSvc, restExecSvc, qryExecSvc);
// Lifecycle bean notifications.
notifyLifecycleBeans(AFTER_NODE_START);
@@ -1545,11 +1553,13 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
ExecutorService sysExecSvc,
ExecutorService p2pExecSvc,
ExecutorService mgmtExecSvc,
- ExecutorService restExecSvc) throws IgniteCheckedException {
+ ExecutorService restExecSvc,
+ ExecutorService qryExecSvc) throws IgniteCheckedException {
pubExecSvcMBean = registerExecutorMBean(execSvc, "GridExecutionExecutor");
sysExecSvcMBean = registerExecutorMBean(sysExecSvc, "GridSystemExecutor");
mgmtExecSvcMBean = registerExecutorMBean(mgmtExecSvc, "GridManagementExecutor");
p2PExecSvcMBean = registerExecutorMBean(p2pExecSvc, "GridClassLoadingExecutor");
+ qryExecSvcMBean = registerExecutorMBean(qryExecSvc, "GridQueryExecutor");
ConnectorConfiguration clientCfg = cfg.getConnectorConfiguration();
@@ -2043,7 +2053,8 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
unregisterMBean(p2PExecSvcMBean) &
unregisterMBean(kernalMBean) &
unregisterMBean(locNodeMBean) &
- unregisterMBean(restExecSvcMBean)
+ unregisterMBean(restExecSvcMBean) &
+ unregisterMBean(qryExecSvcMBean)
))
errOnStop = false;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index c55f954..42ff739 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -1489,6 +1489,9 @@ public class IgnitionEx {
/** Continuous query executor service. */
private IgniteStripedThreadPoolExecutor callbackExecSvc;
+ /** Query executor service. */
+ private ThreadPoolExecutor qryExecSvc;
+
/** Grid state. */
private volatile IgniteState state = STOPPED;
@@ -1783,6 +1786,18 @@ public class IgnitionEx {
);
}
+ validateThreadPoolSize(cfg.getQueryThreadPoolSize(), "query");
+
+ qryExecSvc = new IgniteThreadPoolExecutor(
+ "query",
+ cfg.getGridName(),
+ cfg.getQueryThreadPoolSize(),
+ cfg.getQueryThreadPoolSize(),
+ DFLT_THREAD_KEEP_ALIVE_TIME,
+ new LinkedBlockingQueue<Runnable>());
+
+ qryExecSvc.allowCoreThreadTimeOut(true);
+
// Register Ignite MBean for current grid instance.
registerFactoryMbean(myCfg.getMBeanServer());
@@ -1808,6 +1823,7 @@ public class IgnitionEx {
affExecSvc,
idxExecSvc,
callbackExecSvc,
+ qryExecSvc,
new CA() {
@Override public void apply() {
startLatch.countDown();
@@ -2402,6 +2418,10 @@ public class IgnitionEx {
sysExecSvc = null;
+ U.shutdownNow(getClass(), qryExecSvc, log);
+
+ qryExecSvc = null;
+
U.shutdownNow(getClass(), stripedExecSvc, log);
stripedExecSvc = null;
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index 16ea972..0228684 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -98,6 +98,7 @@ import static org.apache.ignite.internal.managers.communication.GridIoPolicy.P2P
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.UTILITY_CACHE_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.QUERY_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.isReservedGridIoPolicy;
import static org.apache.ignite.internal.util.nio.GridNioBackPressureControl.threadProcessingMessage;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q_OPTIMIZED_RMV;
@@ -686,6 +687,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
case IDX_POOL:
case IGFS_POOL:
case DATA_STREAMER_POOL:
+ case QUERY_POOL:
{
if (msg.isOrdered())
processOrderedMessage(nodeId, msg, plc, msgC);
@@ -1206,7 +1208,7 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
private void invokeListener(Byte plc, GridMessageListener lsnr, UUID nodeId, Object msg) {
Byte oldPlc = CUR_PLC.get();
- boolean change = F.eq(oldPlc, plc);
+ boolean change = !F.eq(oldPlc, plc);
if (change)
CUR_PLC.set(plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index cb673d0..a3fb370 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -49,6 +49,9 @@ public class GridIoPolicy {
/** Data streamer execution pool. */
public static final byte DATA_STREAMER_POOL = 9;
+ /** Query execution pool. */
+ public static final byte QUERY_POOL = 10;
+
/**
* Defines the range of reserved pools that are not available for plugins.
* @param key The key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index f42815b..f84b741 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -128,6 +128,11 @@ public class PoolProcessor extends GridProcessorAdapter {
return ctx.getDataStreamerExecutorService();
+ case GridIoPolicy.QUERY_POOL:
+ assert ctx.getQueryExecutorService() != null : "Query pool is not configured.";
+
+ return ctx.getQueryExecutorService();
+
default: {
if (plc < 0)
throw new IgniteCheckedException("Policy cannot be negative: " + plc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
index db45e27..40f0e43 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/GridTestKernalContext.java
@@ -64,6 +64,7 @@ public class GridTestKernalContext extends GridKernalContextImpl {
null,
null,
null,
+ null,
U.allPluginProviders()
);
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index ac1a6a6..2802da5 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -44,6 +44,7 @@ import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
@@ -86,7 +87,6 @@ import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVer
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.REPLICATED;
-import static org.apache.ignite.internal.processors.query.h2.twostep.GridReduceQueryExecutor.QUERY_POOL;
import static org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory.toMessages;
import static org.jsr166.ConcurrentLinkedHashMap.QueuePolicy.PER_SEGMENT_Q;
@@ -655,7 +655,7 @@ public class GridMapQueryExecutor {
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
}
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
e.addSuppressed(err);
@@ -729,7 +729,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
log.error("Failed to send message.", e);
@@ -756,7 +756,7 @@ public class GridMapQueryExecutor {
if (loc)
h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, GridIoPolicy.QUERY_POOL);
}
catch (Exception e) {
U.warn(log, "Failed to send retry message: " + e.getMessage());
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 1f00ed2..7c036ea 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -104,9 +104,6 @@ import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType
* Reduce query executor.
*/
public class GridReduceQueryExecutor {
- /** Thread pool to process query messages. */
- public static final byte QUERY_POOL = GridIoPolicy.SYSTEM_POOL;
-
/** */
private static final IgniteProductVersion DISTRIBUTED_JOIN_SINCE = IgniteProductVersion.fromString("1.7.0");
@@ -327,7 +324,7 @@ public class GridReduceQueryExecutor {
if (node.isLocal())
h2.mapQueryExecutor().onMessage(ctx.localNodeId(), msg0);
else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, QUERY_POOL);
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg0, GridIoPolicy.QUERY_POOL);
}
catch (IgniteCheckedException e) {
throw new CacheException("Failed to fetch data from node: " + node.id(), e);
@@ -1178,7 +1175,7 @@ public class GridReduceQueryExecutor {
msg,
specialize,
locNodeHnd,
- QUERY_POOL,
+ GridIoPolicy.QUERY_POOL,
runLocParallel);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
new file mode 100644
index 0000000..bba3642
--- /dev/null
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/query/IgniteSqlQueryDedicatedPoolTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.query;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cache.query.SqlFieldsQuery;
+import org.apache.ignite.cache.query.annotations.QuerySqlFunction;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import java.util.List;
+
+/**
+ * Ensures that SQL queries are executed in a dedicated thread pool.
+ */
+public class IgniteSqlQueryDedicatedPoolTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryVmIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Name of the cache for test */
+ private static final String CACHE_NAME = "query_pool_test";
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ startGrid("server");
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ TcpDiscoverySpi spi = (TcpDiscoverySpi)cfg.getDiscoverySpi();
+
+ spi.setIpFinder(IP_FINDER);
+
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setIndexedTypes(Integer.class, Integer.class);
+ ccfg.setSqlFunctionClasses(IgniteSqlQueryDedicatedPoolTest.class);
+ ccfg.setName(CACHE_NAME);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ if ("client".equals(gridName))
+ cfg.setClientMode(true);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Test that SQL queries are executed in dedicated pool
+ */
+ public void testSqlQueryUsesDedicatedThreadPool() throws Exception {
+ try (Ignite client = startGrid("client")) {
+ IgniteCache<Integer, Integer> cache = client.cache(CACHE_NAME);
+
+ QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery("select currentPolicy()"));
+
+ List<List<?>> result = cursor.getAll();
+
+ cursor.close();
+
+ assertEquals(1, result.size());
+
+ Byte plc = (Byte)result.get(0).get(0);
+
+ assert plc != null;
+ assert plc == GridIoPolicy.QUERY_POOL;
+ }
+ }
+
+ /**
+ * Custom SQL function to return current thread name from inside query executor
+ */
+ @SuppressWarnings("unused")
+ @QuerySqlFunction(alias = "currentPolicy")
+ public static Byte currentPolicy() {
+ return GridIoManager.currentPolicy();
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/aa46bc7c/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index b5e4078..49fb269 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@ -95,6 +95,7 @@ import org.apache.ignite.internal.processors.cache.query.GridCacheSwapScanQueryS
import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlQueryDedicatedPoolTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
@@ -240,6 +241,7 @@ public class IgniteCacheQuerySelfTestSuite extends TestSuite {
suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class);
suite.addTestSuite(CacheSqlQueryValueCopySelfTest.class);
suite.addTestSuite(IgniteCacheQueryCacheDestroySelfTest.class);
+ suite.addTestSuite(IgniteSqlQueryDedicatedPoolTest.class);
return suite;
}
[3/3] ignite git commit: Merge branch 'ignite-2.0' into
ignite-4475-async
Posted by vo...@apache.org.
Merge branch 'ignite-2.0' into ignite-4475-async
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/dd4d4390
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/dd4d4390
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/dd4d4390
Branch: refs/heads/ignite-4475-async
Commit: dd4d439085409d94dd0d8e86a4ba4abad46f9517
Parents: 055295e af98efd
Author: devozerov <vo...@gridgain.com>
Authored: Mon Feb 6 17:56:44 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Feb 6 17:56:44 2017 +0300
----------------------------------------------------------------------
.../configuration/IgniteConfiguration.java | 31 ++++++
.../ignite/internal/GridKernalContext.java | 7 ++
.../ignite/internal/GridKernalContextImpl.java | 13 +++
.../apache/ignite/internal/IgniteKernal.java | 17 ++-
.../org/apache/ignite/internal/IgnitionEx.java | 20 ++++
.../managers/communication/GridIoManager.java | 4 +-
.../managers/communication/GridIoPolicy.java | 3 +
.../internal/processors/pool/PoolProcessor.java | 5 +
.../junits/GridTestKernalContext.java | 1 +
.../query/h2/twostep/GridMapQueryExecutor.java | 8 +-
.../h2/twostep/GridReduceQueryExecutor.java | 7 +-
.../query/IgniteSqlQueryDedicatedPoolTest.java | 110 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
13 files changed, 215 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
[2/3] ignite git commit: Merge branch 'master' into ignite-2.0
Posted by vo...@apache.org.
Merge branch 'master' into ignite-2.0
# Conflicts:
# modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/af98efd5
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/af98efd5
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/af98efd5
Branch: refs/heads/ignite-4475-async
Commit: af98efd5487dc2f6177b245d3d4c369f8ec4fe81
Parents: aa46bc7 0726d32
Author: devozerov <vo...@gridgain.com>
Authored: Mon Feb 6 17:55:58 2017 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Mon Feb 6 17:55:58 2017 +0300
----------------------------------------------------------------------
assembly/LICENSE_FABRIC | 35 +++--
assembly/LICENSE_HADOOP | 17 ++-
examples/schema-import/bin/h2-server.bat | 2 +-
examples/schema-import/bin/h2-server.sh | 4 +-
.../schema-import/bin/schema-import.properties | 2 +-
.../src/main/resources/META-INF/licenses.txt.vm | 7 +-
.../apache/ignite/IgniteSystemProperties.java | 8 ++
.../ignite/lang/IgniteProductVersion.java | 2 +-
.../processors/query/h2/IgniteH2Indexing.java | 8 +-
.../h2/twostep/GridReduceQueryExecutor.java | 2 +-
.../IgniteSqlEntryCacheModeAgnosticTest.java | 140 +++++++++++++++++++
.../IgniteCacheQuerySelfTestSuite.java | 2 +
.../Cache/CacheAbstractTransactionalTest.cs | 105 +++++++-------
.../Apache.Ignite.Core/Impl/Cache/CacheImpl.cs | 9 +-
.../views/templates/agent-download.jade | 2 +-
modules/web-console/web-agent/README.txt | 2 +-
.../cache/IgnitePutRemoveBenchmark.java | 42 ++++++
.../cache/IgnitePutRemoveTxBenchmark.java | 30 ++++
18 files changed, 323 insertions(+), 96 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af98efd5/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/af98efd5/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
----------------------------------------------------------------------
diff --cc modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
index 49fb269,d85e111..ed3d1e8
--- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite.java
@@@ -95,7 -95,7 +95,8 @@@ import org.apache.ignite.internal.proce
import org.apache.ignite.internal.processors.cache.query.IgniteCacheQueryCacheDestroySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQuerySelfTest;
import org.apache.ignite.internal.processors.cache.query.IndexingSpiQueryTxSelfTest;
+import org.apache.ignite.internal.processors.query.IgniteSqlQueryDedicatedPoolTest;
+ import org.apache.ignite.internal.processors.query.IgniteSqlEntryCacheModeAgnosticTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSchemaIndexingTest;
import org.apache.ignite.internal.processors.query.IgniteSqlSplitterSelfTest;
import org.apache.ignite.internal.processors.query.h2.GridH2IndexRebuildTest;
@@@ -241,7 -241,7 +242,8 @@@ public class IgniteCacheQuerySelfTestSu
suite.addTestSuite(CacheOffheapBatchIndexingSingleTypeTest.class);
suite.addTestSuite(CacheSqlQueryValueCopySelfTest.class);
suite.addTestSuite(IgniteCacheQueryCacheDestroySelfTest.class);
+ suite.addTestSuite(IgniteSqlQueryDedicatedPoolTest.class);
+ suite.addTestSuite(IgniteSqlEntryCacheModeAgnosticTest.class);
return suite;
}