You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:30 UTC
[46/50] [abbrv] ignite git commit: ignite-split2 - separate pool for
index requests + parallel execution of local query in case of distributed
join
ignite-split2 - separate pool for index requests + parallel execution of local query in case of distributed join
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5dd4bc9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5dd4bc9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5dd4bc9
Branch: refs/heads/ignite-1232
Commit: c5dd4bc99f487b184e246fd483ce21822226a7ae
Parents: 9483473
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Jan 11 16:44:16 2016 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Jan 11 16:44:16 2016 +0300
----------------------------------------------------------------------
.../managers/communication/GridIoManager.java | 16 +++++++
.../managers/communication/GridIoPolicy.java | 3 ++
.../closure/GridClosureProcessor.java | 2 +-
.../processors/query/h2/IgniteH2Indexing.java | 50 ++++++++++++++++++--
.../query/h2/opt/GridH2TreeIndex.java | 9 +++-
.../h2/twostep/GridReduceQueryExecutor.java | 15 +++---
6 files changed, 83 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index ec2d797..32c96ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -87,6 +87,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
@@ -147,6 +148,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
/** IGFS pool. */
private ExecutorService igfsPool;
+ /** Index pool. */
+ private ExecutorService idxPool;
+
/** Discovery listener. */
private GridLocalEventListener discoLsnr;
@@ -254,6 +258,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
0,
new LinkedBlockingQueue<Runnable>());
+ if (IgniteComponentType.INDEXING.inClassPath()) {
+ int cpus = Runtime.getRuntime().availableProcessors();
+
+ idxPool = new IgniteThreadPoolExecutor("idx", ctx.gridName(),
+ cpus, cpus * 2, 3000L, new LinkedBlockingQueue<Runnable>(1000));
+ }
+
getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
@Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
try {
@@ -701,6 +712,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
return igfsPool;
+ case IDX_POOL:
+ assert idxPool != null : "Indexing pool is not configured.";
+
+ return idxPool;
+
default: {
assert plc >= 0 : "Negative policy: " + plc;
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index a417857..70a7354 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -46,6 +46,9 @@ public class GridIoPolicy {
/** IGFS pool. */
public static final byte IGFS_POOL = 7;
+ /** Pool for handling distributed index range requests. */
+ public static final byte IDX_POOL = 8;
+
/**
* Defines the range of reserved pools that are not available for plugins.
* @param key The key.
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index c53cb8b..c4a915e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -744,7 +744,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
* @return Future.
* @throws IgniteCheckedException Thrown in case of any errors.
*/
- private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException {
+ public IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException {
if (c == null)
return new GridFinishedFuture();
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 58026ca..f6c312e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
@@ -113,6 +114,7 @@ import org.apache.ignite.lang.IgniteBiClosure;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -314,6 +316,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache =
new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
+ /** */
+ private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() {
+ @Override public void apply(IgniteInternalFuture<?> fut) {
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, e.getMessage(), e);
+ }
+ }
+ };
+
/**
* @return Kernal context.
*/
@@ -1575,6 +1589,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* @param msg Message.
* @param specialize Optional closure to specialize message for each node.
* @param locNodeHandler Handler for local node.
+ * @param plc Policy identifying the executor service which will process message.
+ * @param runLocParallel Run local handler in parallel thread.
* @return {@code true} If all messages sent successfully.
*/
public boolean send(
@@ -1582,7 +1598,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
Collection<ClusterNode> nodes,
Message msg,
@Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
- @Nullable IgniteInClosure2X<ClusterNode, Message> locNodeHandler
+ @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHandler,
+ byte plc,
+ boolean runLocParallel
) {
boolean ok = true;
@@ -1606,7 +1624,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
((GridCacheQueryMarshallable)msg).marshall(marshaller);
}
- ctx.io().send(node, topic, msg, GridReduceQueryExecutor.QUERY_POOL);
+ ctx.io().send(node, topic, msg, plc);
}
catch (IgniteCheckedException e) {
ok = false;
@@ -1616,8 +1634,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
}
- if (locNode != null) // Local node goes the last to allow parallel execution.
- locNodeHandler.apply(locNode, specialize == null ? msg : specialize.apply(locNode, msg));
+ // Local node goes the last to allow parallel execution.
+ if (locNode != null) {
+ if (specialize != null)
+ msg = specialize.apply(locNode, msg);
+
+ if (runLocParallel) {
+ final ClusterNode finalLocNode = locNode;
+ final Message finalMsg = msg;
+
+ try {
+ // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here.
+ ctx.closure().runLocal(new GridPlainRunnable() {
+ @Override public void run() {
+ locNodeHandler.apply(finalLocNode, finalMsg);
+ }
+ }, plc).listen(logger);
+ }
+ catch (IgniteCheckedException e) {
+ ok = false;
+
+ U.error(log, "Failed to execute query locally.", e);
+ }
+ }
+ else
+ locNodeHandler.apply(locNode, msg);
+ }
return ok;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index f624292..fd2f174 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -41,6 +41,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
@@ -237,7 +238,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
* @param msg Message.
*/
private void send(Collection<ClusterNode> nodes, Message msg) {
- if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler))
+ if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler,
+ GridIoPolicy.IDX_POOL, false))
throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
}
@@ -1410,6 +1412,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
private GridH2IndexRangeResponse awaitForResponse() {
assert remainingRanges > 0;
+ final long start = U.currentTimeMillis();
+
for (int attempt = 0;; attempt++) {
if (qctx.isCleared())
throw new GridH2RetryException("Query is cancelled.");
@@ -1452,6 +1456,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
if (req == null || req.bounds() == null) // We have already received the first response.
throw new GridH2RetryException("Failure on remote node.");
+ if (U.currentTimeMillis() - start > 30_000)
+ throw new GridH2RetryException("Timeout.");
+
try {
U.sleep(20 * attempt);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 409e26f..3151677 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -627,8 +627,9 @@ public class GridReduceQueryExecutor {
.partitions(convert(partsMap))
.queries(mapQrys)
.flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0),
- oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null))
- {
+ oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
+ distributedJoins)
+ ) {
awaitAllReplies(r, nodes);
Object state = r.state.get();
@@ -710,7 +711,7 @@ public class GridReduceQueryExecutor {
}
if (retry) {
- send(nodes, new GridQueryCancelRequest(qryReqId), null);
+ send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
if (Thread.currentThread().isInterrupted())
throw new IgniteInterruptedCheckedException("Query was interrupted.");
@@ -725,7 +726,7 @@ public class GridReduceQueryExecutor {
super.close();
if (distributedJoins || !allIndexesFetched(r.idxs))
- send(finalNodes, new GridQueryCancelRequest(qryReqId), null);
+ send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
}
};
}
@@ -1121,17 +1122,19 @@ public class GridReduceQueryExecutor {
* @param nodes Nodes.
* @param msg Message.
* @param specialize Optional closure to specialize message for each node.
+ * @param runLocParallel Run local handler in parallel thread.
* @return {@code true} If all messages sent successfully.
*/
private boolean send(
Collection<ClusterNode> nodes,
Message msg,
- @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize
+ @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
+ boolean runLocParallel
) {
if (log.isDebugEnabled())
log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]");
- return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler);
+ return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler, QUERY_POOL, runLocParallel);
}
/**