You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by av...@apache.org on 2015/08/05 17:54:42 UTC
[17/26] incubator-ignite git commit: master - query restart tests fix
master - query restart tests fix
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/90adeae9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/90adeae9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/90adeae9
Branch: refs/heads/ignite-1093
Commit: 90adeae9dd57f0aaaabe5f244d5167853a0b48dc
Parents: 38810b6
Author: S.Vladykin <sv...@gridgain.com>
Authored: Tue Aug 4 20:30:00 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Tue Aug 4 20:30:00 2015 +0300
----------------------------------------------------------------------
.../h2/twostep/GridReduceQueryExecutor.java | 34 ++++++++++++++++++--
1 file changed, 31 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/90adeae9/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index cde3288..ac269db 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -150,8 +150,7 @@ public class GridReduceQueryExecutor {
for (QueryRun r : runs.values()) {
for (GridMergeTable tbl : r.tbls) {
if (tbl.getScanIndex(null).hasSource(nodeId)) {
- // Will attempt to retry. If reduce query was started it will fail on next page fetching.
- retry(r, h2.readyTopologyVersion(), nodeId);
+ handleNodeLeft(r, nodeId);
break;
}
@@ -162,6 +161,15 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param r Query run.
+ * @param nodeId Left node ID.
+ */
+ private void handleNodeLeft(QueryRun r, UUID nodeId) {
+ // Will attempt to retry. If reduce query was started it will fail on next page fetching.
+ retry(r, h2.readyTopologyVersion(), nodeId);
+ }
+
+ /**
* @param nodeId Node ID.
* @param msg Message.
*/
@@ -515,7 +523,7 @@ public class GridReduceQueryExecutor {
if (send(nodes,
new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null), partsMap)) {
- U.await(r.latch);
+ awaitAllReplies(r, nodes);
Object state = r.state.get();
@@ -595,6 +603,26 @@ public class GridReduceQueryExecutor {
}
/**
+ * @param r Query run.
+ * @param nodes Nodes to check periodically if they alive.
+ * @throws IgniteInterruptedCheckedException If interrupted.
+ */
+ private void awaitAllReplies(QueryRun r, Collection<ClusterNode> nodes)
+ throws IgniteInterruptedCheckedException {
+ while (!U.await(r.latch, 500, TimeUnit.MILLISECONDS)) {
+ for (ClusterNode node : nodes) {
+ if (!ctx.discovery().alive(node)) {
+ handleNodeLeft(r, node.id());
+
+ assert r.latch.getCount() == 0;
+
+ return;
+ }
+ }
+ }
+ }
+
+ /**
* Calculates data nodes for replicated caches on unstable topology.
*
* @param cctx Cache context for main space.