You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/05 17:54:42 UTC

[17/26] incubator-ignite git commit: master - query restart tests fix

master - query restart tests fix


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/90adeae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/90adeae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/90adeae9

Branch: refs/heads/ignite-1093
Commit: 90adeae9dd57f0aaaabe5f244d5167853a0b48dc
Parents: 38810b6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Aug 4 20:30:00 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Aug 4 20:30:00 2015 +0300

----------------------------------------------------------------------
 .../h2/twostep/GridReduceQueryExecutor.java     | 34 ++++++++++++++++++--
 1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90adeae9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index cde3288..ac269db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -150,8 +150,7 @@ public class GridReduceQueryExecutor {
                 for (QueryRun r : runs.values()) {
                     for (GridMergeTable tbl : r.tbls) {
                         if (tbl.getScanIndex(null).hasSource(nodeId)) {
-                            // Will attempt to retry. If reduce query was started it will fail on next page fetching.
-                            retry(r, h2.readyTopologyVersion(), nodeId);
+                            handleNodeLeft(r, nodeId);
 
                             break;
                         }
@@ -162,6 +161,15 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param r Query run.
+     * @param nodeId Left node ID.
+     */
+    private void handleNodeLeft(QueryRun r, UUID nodeId) {
+        // Will attempt to retry. If reduce query was started it will fail on next page fetching.
+        retry(r, h2.readyTopologyVersion(), nodeId);
+    }
+
+    /**
      * @param nodeId Node ID.
      * @param msg Message.
      */
@@ -515,7 +523,7 @@ public class GridReduceQueryExecutor {
 
                 if (send(nodes,
                     new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
-                    U.await(r.latch);
+                    awaitAllReplies(r, nodes);
 
                     Object state = r.state.get();
 
@@ -595,6 +603,26 @@ public class GridReduceQueryExecutor {
     }
 
     /**
+     * @param r Query run.
+     * @param nodes Nodes to check periodically if they alive.
+     * @throws IgniteInterruptedCheckedException If interrupted.
+     */
+    private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
+        throws IgniteInterruptedCheckedException {
+        while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+            for (ClusterNode node : nodes) {
+                if (!ctx.discovery().alive(node)) {
+                    handleNodeLeft(r, node.id());
+
+                    assert r.latch.getCount() == 0;
+
+                    return;
+                }
+            }
+        }
+    }
+
+    /**
      * Calculates data nodes for replicated caches on unstable topology.
      *
      * @param cctx Cache context for main space.