You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:30 UTC

[46/50] [abbrv] ignite git commit: ignite-split2 - separate pool for index requests + parallel execution of local query in case of distributed join

ignite-split2 - separate pool for index requests + parallel execution of local query in case of distributed join


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c5dd4bc9
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c5dd4bc9
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c5dd4bc9

Branch: refs/heads/ignite-1232
Commit: c5dd4bc99f487b184e246fd483ce21822226a7ae
Parents: 9483473
Author: S.Vladykin <sv...@gridgain.com>
Authored: Mon Jan 11 16:44:16 2016 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Mon Jan 11 16:44:16 2016 +0300

----------------------------------------------------------------------
 .../managers/communication/GridIoManager.java   | 16 +++++++
 .../managers/communication/GridIoPolicy.java    |  3 ++
 .../closure/GridClosureProcessor.java           |  2 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 50 ++++++++++++++++++--
 .../query/h2/opt/GridH2TreeIndex.java           |  9 +++-
 .../h2/twostep/GridReduceQueryExecutor.java     | 15 +++---
 6 files changed, 83 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
index ec2d797..32c96ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoManager.java
@@ -87,6 +87,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
 import static org.apache.ignite.internal.GridTopic.TOPIC_COMM_USER;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.AFFINITY_POOL;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IDX_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.IGFS_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MANAGEMENT_POOL;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.MARSH_CACHE_POOL;
@@ -147,6 +148,9 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
     /** IGFS pool. */
     private ExecutorService igfsPool;
 
+    /** Index pool. */
+    private ExecutorService idxPool;
+
     /** Discovery listener. */
     private GridLocalEventListener discoLsnr;
 
@@ -254,6 +258,13 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
             0,
             new LinkedBlockingQueue<Runnable>());
 
+        if (IgniteComponentType.INDEXING.inClassPath()) {
+            int cpus = Runtime.getRuntime().availableProcessors();
+
+            idxPool = new IgniteThreadPoolExecutor("idx", ctx.gridName(),
+                cpus, cpus * 2, 3000L, new LinkedBlockingQueue<Runnable>(1000));
+        }
+
         getSpi().setListener(commLsnr = new CommunicationListener<Serializable>() {
             @Override public void onMessage(UUID nodeId, Serializable msg, IgniteRunnable msgC) {
                 try {
@@ -701,6 +712,11 @@ public class GridIoManager extends GridManagerAdapter<CommunicationSpi<Serializa
 
                 return igfsPool;
 
+            case IDX_POOL:
+                assert idxPool != null : "Indexing pool is not configured.";
+
+                return idxPool;
+
             default: {
                 assert plc >= 0 : "Negative policy: " + plc;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
index a417857..70a7354 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoPolicy.java
@@ -46,6 +46,9 @@ public class GridIoPolicy {
     /** IGFS pool. */
     public static final byte IGFS_POOL = 7;
 
+    /** Pool for handling distributed index range requests. */
+    public static final byte IDX_POOL = 8;
+
     /**
      * Defines the range of reserved pools that are not available for plugins.
      * @param key The key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index c53cb8b..c4a915e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -744,7 +744,7 @@ public class GridClosureProcessor extends GridProcessorAdapter {
      * @return Future.
      * @throws IgniteCheckedException Thrown in case of any errors.
      */
-    private IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException {
+    public IgniteInternalFuture<?> runLocal(@Nullable final Runnable c, byte plc) throws IgniteCheckedException {
         if (c == null)
             return new GridFinishedFuture();
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 58026ca..f6c312e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -101,6 +101,7 @@ import org.apache.ignite.internal.util.GridBoundedConcurrentLinkedHashMap;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
 import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.lang.GridPlainRunnable;
 import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeMemory;
@@ -113,6 +114,7 @@ import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.plugin.extensions.communication.Message;
@@ -314,6 +316,18 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private final GridBoundedConcurrentLinkedHashMap<TwoStepCachedQueryKey, TwoStepCachedQuery> twoStepCache =
         new GridBoundedConcurrentLinkedHashMap<>(TWO_STEP_QRY_CACHE_SIZE);
 
+    /** */
+    private final IgniteInClosure<? super IgniteInternalFuture<?>> logger = new IgniteInClosure<IgniteInternalFuture<?>>() {
+        @Override public void apply(IgniteInternalFuture<?> fut) {
+            try {
+                fut.get();
+            }
+            catch (IgniteCheckedException e) {
+                U.error(log, e.getMessage(), e);
+            }
+        }
+    };
+
     /**
      * @return Kernal context.
      */
@@ -1575,6 +1589,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * @param msg Message.
      * @param specialize Optional closure to specialize message for each node.
      * @param locNodeHandler Handler for local node.
+     * @param plc Policy identifying the executor service which will process message.
+     * @param runLocParallel Run local handler in parallel thread.
      * @return {@code true} If all messages sent successfully.
      */
     public boolean send(
@@ -1582,7 +1598,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         Collection<ClusterNode> nodes,
         Message msg,
         @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
-        @Nullable IgniteInClosure2X<ClusterNode, Message> locNodeHandler
+        @Nullable final IgniteInClosure2X<ClusterNode, Message> locNodeHandler,
+        byte plc,
+        boolean runLocParallel
     ) {
         boolean ok = true;
 
@@ -1606,7 +1624,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         ((GridCacheQueryMarshallable)msg).marshall(marshaller);
                 }
 
-                ctx.io().send(node, topic, msg, GridReduceQueryExecutor.QUERY_POOL);
+                ctx.io().send(node, topic, msg, plc);
             }
             catch (IgniteCheckedException e) {
                 ok = false;
@@ -1616,8 +1634,32 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             }
         }
 
-        if (locNode != null) // Local node goes the last to allow parallel execution.
-            locNodeHandler.apply(locNode, specialize == null ? msg : specialize.apply(locNode, msg));
+        // Local node goes the last to allow parallel execution.
+        if (locNode != null) {
+            if (specialize != null)
+                msg = specialize.apply(locNode, msg);
+
+            if (runLocParallel) {
+                final ClusterNode finalLocNode = locNode;
+                final Message finalMsg = msg;
+
+                try {
+                    // We prefer runLocal to runLocalSafe, because the latter can produce deadlock here.
+                    ctx.closure().runLocal(new GridPlainRunnable() {
+                        @Override public void run() {
+                            locNodeHandler.apply(finalLocNode, finalMsg);
+                        }
+                    }, plc).listen(logger);
+                }
+                catch (IgniteCheckedException e) {
+                    ok = false;
+
+                    U.error(log, "Failed to execute query locally.", e);
+                }
+            }
+            else
+                locNodeHandler.apply(locNode, msg);
+        }
 
         return ok;
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index f624292..fd2f174 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -41,6 +41,7 @@ import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2IndexRangeRequest;
@@ -237,7 +238,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
      * @param msg Message.
      */
     private void send(Collection<ClusterNode> nodes, Message msg) {
-        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler))
+        if (!getTable().rowDescriptor().indexing().send(msgTopic, nodes, msg, null, locNodeHandler,
+            GridIoPolicy.IDX_POOL, false))
             throw new GridH2RetryException("Failed to send message to nodes: " + nodes + ".");
     }
 
@@ -1410,6 +1412,8 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         private GridH2IndexRangeResponse awaitForResponse() {
             assert remainingRanges > 0;
 
+            final long start = U.currentTimeMillis();
+
             for (int attempt = 0;; attempt++) {
                 if (qctx.isCleared())
                     throw new GridH2RetryException("Query is cancelled.");
@@ -1452,6 +1456,9 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                             if (req == null || req.bounds() == null) // We have already received the first response.
                                 throw new GridH2RetryException("Failure on remote node.");
 
+                            if (U.currentTimeMillis() - start > 30_000)
+                                throw new GridH2RetryException("Timeout.");
+
                             try {
                                 U.sleep(20 * attempt);
                             }

http://git-wip-us.apache.org/repos/asf/ignite/blob/c5dd4bc9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 409e26f..3151677 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -627,8 +627,9 @@ public class GridReduceQueryExecutor {
                             .partitions(convert(partsMap))
                             .queries(mapQrys)
                             .flags(distributedJoins ? GridH2QueryRequest.FLAG_DISTRIBUTED_JOINS : 0),
-                    oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null))
-                {
+                    oldStyle && partsMap != null ? new ExplicitPartitionsSpecializer(partsMap) : null,
+                    distributedJoins)
+                ) {
                     awaitAllReplies(r, nodes);
 
                     Object state = r.state.get();
@@ -710,7 +711,7 @@ public class GridReduceQueryExecutor {
                 }
 
                 if (retry) {
-                    send(nodes, new GridQueryCancelRequest(qryReqId), null);
+                    send(nodes, new GridQueryCancelRequest(qryReqId), null, false);
 
                     if (Thread.currentThread().isInterrupted())
                         throw new IgniteInterruptedCheckedException("Query was interrupted.");
@@ -725,7 +726,7 @@ public class GridReduceQueryExecutor {
                         super.close();
 
                         if (distributedJoins || !allIndexesFetched(r.idxs))
-                            send(finalNodes, new GridQueryCancelRequest(qryReqId), null);
+                            send(finalNodes, new GridQueryCancelRequest(qryReqId), null, false);
                     }
                 };
             }
@@ -1121,17 +1122,19 @@ public class GridReduceQueryExecutor {
      * @param nodes Nodes.
      * @param msg Message.
      * @param specialize Optional closure to specialize message for each node.
+     * @param runLocParallel Run local handler in parallel thread.
      * @return {@code true} If all messages sent successfully.
      */
     private boolean send(
         Collection<ClusterNode> nodes,
         Message msg,
-        @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize
+        @Nullable IgniteBiClosure<ClusterNode, Message, Message> specialize,
+        boolean runLocParallel
     ) {
         if (log.isDebugEnabled())
             log.debug("Sending: [msg=" + msg + ", nodes=" + nodes + ", specialize=" + specialize + "]");
 
-        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler);
+        return h2.send(GridTopic.TOPIC_QUERY, nodes, msg, specialize, locNodeHandler, QUERY_POOL, runLocParallel);
     }
 
     /**