You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/06/18 08:54:18 UTC

[11/50] incubator-ignite git commit: ignite-484-1 - collocated partitions + replicated caches

ignite-484-1 - collocated partitions + replicated caches


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d389ada8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d389ada8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d389ada8

Branch: refs/heads/ignite-sprint-6
Commit: d389ada8b9546994d3dec5da00349f831f81fcb0
Parents: 357b4c0
Author: S.Vladykin <sv...@gridgain.com>
Authored: Wed Jun 3 22:55:16 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Wed Jun 3 22:55:16 2015 +0300

----------------------------------------------------------------------
 .../processors/query/GridQueryIndexing.java     |   3 +-
 .../h2/twostep/messages/GridQueryRequest.java   |  13 +-
 .../processors/query/h2/IgniteH2Indexing.java   |  12 +-
 .../query/h2/twostep/GridMapQueryExecutor.java  |  15 +-
 .../h2/twostep/GridReduceQueryExecutor.java     | 248 +++++++++++++------
 .../IgniteCacheQueryNodeRestartSelfTest.java    |  10 +-
 6 files changed, 206 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
index de35201..98c2af7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryIndexing.java
@@ -243,6 +243,5 @@ public interface GridQueryIndexing {
      * @param parts Partitions.
      * @return Backup filter.
      */
-    public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer,
-        List<int[]> parts);
+    public IndexingQueryFilter backupFilter(List<String> caches, AffinityTopologyVersion topVer, int[] parts);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
index 99ef094..6465bbc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/messages/GridQueryRequest.java
@@ -57,8 +57,7 @@ public class GridQueryRequest implements Message {
     private List<String> extraSpaces;
 
     /** */
-    @GridDirectCollection(int[].class)
-    private List<int[]> parts;
+    private int[] parts;
 
     /**
      * Default constructor.
@@ -83,7 +82,7 @@ public class GridQueryRequest implements Message {
         Collection<GridCacheSqlQuery> qrys,
         AffinityTopologyVersion topVer,
         List<String> extraSpaces,
-        List<int[]> parts) {
+        int[] parts) {
         this.reqId = reqId;
         this.pageSize = pageSize;
         this.space = space;
@@ -110,14 +109,14 @@ public class GridQueryRequest implements Message {
     /**
      * @return All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
      */
-    public List<int[]> partitions() {
+    public int[] partitions() {
         return parts;
     }
 
     /**
      * @param parts All the needed partitions for {@link #space()} and {@link #extraSpaces()}.
      */
-    public void partitions(List<int[]> parts) {
+    public void partitions(int[] parts) {
         this.parts = parts;
     }
 
@@ -217,7 +216,7 @@ public class GridQueryRequest implements Message {
                 writer.incrementState();
 
             case 6:
-                if (!writer.writeCollection("partitions", parts, MessageCollectionItemType.INT_ARR))
+                if (!writer.writeIntArray("partitions", parts))
                     return false;
 
                 writer.incrementState();
@@ -283,7 +282,7 @@ public class GridQueryRequest implements Message {
                 reader.incrementState();
 
             case 6:
-                parts = reader.readCollection("partitions", MessageCollectionItemType.INT_ARR);
+                parts = reader.readIntArray("partitions");
 
                 if (!reader.isLastRead())
                     return false;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index ffedfb3..da497a2 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -1357,7 +1357,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     @Override public IndexingQueryFilter backupFilter(
         @Nullable final List<String> caches,
         @Nullable final AffinityTopologyVersion topVer,
-        @Nullable final List<int[]> parts
+        @Nullable final int[] parts
     ) {
         final AffinityTopologyVersion topVer0 = topVer != null ? topVer : AffinityTopologyVersion.NONE;
 
@@ -1371,16 +1371,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                 final GridCacheAffinityManager aff = cache.context().affinity();
 
                 if (parts != null) {
-                    int idx = caches.indexOf(spaceName);
-
-                    final int[] parts0 = parts.get(idx);
-
-                    if (parts0.length < 64) { // Fast scan for small arrays.
+                    if (parts.length < 64) { // Fast scan for small arrays.
                         return new IgniteBiPredicate<K,V>() {
                             @Override public boolean apply(K k, V v) {
                                 int p = aff.partition(k);
 
-                                for (int p0 : parts0) {
+                                for (int p0 : parts) {
                                     if (p0 == p)
                                         return true;
 
@@ -1397,7 +1393,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
                         @Override public boolean apply(K k, V v) {
                             int p = aff.partition(k);
 
-                            return Arrays.binarySearch(parts0, p) >= 0;
+                            return Arrays.binarySearch(parts, p) >= 0;
                         }
                     };
                 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 9d9060a..ede0e2e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -222,21 +222,24 @@ public class GridMapQueryExecutor {
      * @return {@code true} If all the needed partitions successfully reserved.
      * @throws IgniteCheckedException If failed.
      */
-    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, List<int[]> parts,
+    private boolean reservePartitions(Collection<String> cacheNames, AffinityTopologyVersion topVer, int[] parts,
         List<GridDhtLocalPartition> reserved) throws IgniteCheckedException {
-        assert parts == null || parts.size() == cacheNames.size();
-
-        int i = 0;
+        Collection<Integer> partIds = parts == null ? null : wrap(parts);
 
         for (String cacheName : cacheNames) {
             GridCacheContext<?,?> cctx = cacheContext(cacheName, topVer);
 
-            Collection<Integer> partIds = parts != null ? wrap(parts.get(i++)) :
-                cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
+            int partsCnt = cctx.affinity().partitions();
+
+            if (parts == null)
+                partIds = cctx.affinity().primaryPartitions(ctx.localNodeId(), topVer);
 
             for (int partId : partIds) {
                 GridDhtLocalPartition part = cctx.topology().localPartition(partId, topVer, false);
 
+                if (partId >= partsCnt)
+                    break; // We can have more partitions because `parts` array is shared for all caches.
+
                 if (part != null) {
                     // Await for owning state.
                     part.owningFuture().get();

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index c445844..87ac2f4 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -56,6 +56,8 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.*;
 
+import static org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion.*;
+
 /**
  * Reduce query executor.
  */
@@ -279,35 +281,85 @@ public class GridReduceQueryExecutor {
     }
 
     /**
-     * @param set Set.
-     * @return Array.
+     * @param readyTop Latest ready topology.
+     * @return {@code true} If preloading is active.
      */
-    private static int[] unbox(Set<Integer> set) {
-        if (set == null)
-            return null;
+    private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
+        AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
 
-        int[] arr = new int[set.size()];
+        int res = readyTop.compareTo(freshTop);
 
-        int i = 0;
+        assert res <= 0 : readyTop + " " + freshTop;
 
-        for (int x : set)
-            arr[i++] = x;
+        return res < 0;
+    }
 
-        return arr;
+    /**
+     * @param name Cache name.
+     * @return Cache context.
+     */
+    private GridCacheContext<?,?> cacheContext(String name) {
+        return ctx.cache().internalCache(name).context();
     }
 
     /**
-     * @param readyTop Latest ready topology.
-     * @return {@code true} If preloading is active.
+     * @param topVer Topology version.
+     * @param cctx Cache context for main space.
+     * @param extraSpaces Extra spaces.
+     * @return Data nodes.
      */
-    private boolean isPreloadingActive(AffinityTopologyVersion readyTop) {
-        AffinityTopologyVersion freshTop = ctx.discovery().topologyVersionEx();
+    private Collection<ClusterNode> dataNodes(
+        AffinityTopologyVersion topVer,
+        final GridCacheContext<?,?> cctx,
+        List<String> extraSpaces
+    ) {
+        String space = cctx.name();
 
-        int res = readyTop.compareTo(freshTop);
+        Set<ClusterNode> nodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(space, topVer));
 
-        assert res <= 0 : readyTop + " " + freshTop;
+        if (F.isEmpty(nodes))
+            throw new CacheException("No data nodes found for cache: " + space);
 
-        return res < 0;
+        if (!F.isEmpty(extraSpaces)) {
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+                if (extraCctx.isLocal())
+                    continue;
+
+                if (cctx.isReplicated() && !extraCctx.isReplicated())
+                    throw new CacheException("Queries running on replicated cache should not contain JOINs " +
+                        "with partitioned tables.");
+
+                Collection<ClusterNode> extraNodes = ctx.discovery().cacheAffinityNodes(extraSpace, topVer);
+
+                if (F.isEmpty(extraNodes))
+                    throw new CacheException("No data nodes found for cache: " + extraSpace);
+
+                if (cctx.isReplicated() && extraCctx.isReplicated()) {
+                    nodes.retainAll(extraNodes);
+
+                    if (nodes.isEmpty() && !isPreloadingActive(topVer))
+                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                            "' have distinct set of data nodes.");
+                }
+                else if (!cctx.isReplicated() && extraCctx.isReplicated()) {
+                    if (!extraNodes.containsAll(nodes) && !isPreloadingActive(topVer))
+                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                            "' have distinct set of data nodes.");
+                }
+                else if (!cctx.isReplicated() && !extraCctx.isReplicated()) {
+                    if ((extraNodes.size() != nodes.size() || !nodes.containsAll(extraNodes)) &&
+                        !isPreloadingActive(topVer))
+                        throw new CacheException("Caches '" + cctx.name() + "' and '" + extraSpace +
+                            "' have distinct set of data nodes.");
+                }
+                else
+                    throw new IllegalStateException();
+            }
+        }
+
+        return nodes;
     }
 
     /**
@@ -331,15 +383,12 @@ public class GridReduceQueryExecutor {
 
             AffinityTopologyVersion topVer = h2.readyTopologyVersion();
 
-            Collection<ClusterNode> nodes = ctx.discovery().cacheAffinityNodes(space, topVer);
-
-            if (F.isEmpty(nodes))
-                throw new CacheException("No data nodes found for cache: " + space);
-
             List<String> extraSpaces = extraSpaces(space, qry.spaces());
 
-            // Explicit partition mapping for unstable topology: {nodeId -> {cacheName -> {parts}}}
-            Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap = null;
+            Collection<ClusterNode> nodes = dataNodes(topVer, cctx, extraSpaces);
+
+            // Explicit partition mapping for unstable topology.
+            Map<ClusterNode, IntArray> gridPartsMap = null;
 
             if (cctx.isReplicated() || qry.explain()) {
                 assert qry.explain() || !nodes.contains(ctx.cluster().get().localNode()) : "We must be on a client node.";
@@ -348,14 +397,10 @@ public class GridReduceQueryExecutor {
                 nodes = Collections.singleton(F.rand(nodes));
             }
             else if (isPreloadingActive(topVer)) {
-                gridPartsMap = new HashMap<>(nodes.size(), 1f);
-
-                collectPartitionOwners(gridPartsMap, cctx);
+                gridPartsMap = partitionLocations(cctx, extraSpaces);
 
-                if (extraSpaces != null) {
-                    for (String extraSpace : extraSpaces)
-                        collectPartitionOwners(gridPartsMap, ctx.cache().internalCache(extraSpace).context());
-                }
+                if (gridPartsMap == null)
+                    continue; // Retry.
 
                 nodes = gridPartsMap.keySet();
             }
@@ -462,46 +507,114 @@ public class GridReduceQueryExecutor {
         }
     }
 
-    /**
-     * Collects actual partition owners for the cache context int the given map.
-     *
-     * @param gridPartsMap Target map.
-     * @param cctx Cache context.
-     */
-    private void collectPartitionOwners(
-        Map<ClusterNode,Map<String,Set<Integer>>> gridPartsMap,
-        GridCacheContext<?,?> cctx
-    ) {
-        int partsCnt = cctx.affinity().partitions();
+    @SuppressWarnings("unchecked")
+    private Map<ClusterNode, IntArray> partitionLocations(final GridCacheContext<?,?> cctx, List<String> extraSpaces) {
+        assert !cctx.isReplicated() && !cctx.isLocal() : cctx.name() + " must be partitioned";
+
+        int maxParts = cctx.affinity().partitions();
+
+        if (extraSpaces != null) { // Find max number of partitions for partitioned caches.
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
+
+                if (extraCctx.isReplicated() || extraCctx.isLocal())
+                    continue;
+
+                int parts = extraCctx.affinity().partitions();
+
+                if (parts > maxParts)
+                    maxParts = parts;
+            }
+        }
+
+        Set<ClusterNode>[] partLocs = new Set[maxParts];
 
-        for (int p = 0; p < partsCnt; p++) {
-            // We don't care about exact topology version here, we just need to get all the needed partition
-            // owners in actual state.
+        // Fill partition locations for main cache.
+        for (int p = 0, parts =  cctx.affinity().partitions(); p < parts; p++) {
             List<ClusterNode> owners = cctx.topology().owners(p);
 
             if (F.isEmpty(owners))
-                continue; // All primary and backup nodes are dead now for this partition. We sorrow.
+                throw new CacheException("No data nodes found for cache '" + cctx.name() + "' for partition " + p);
+
+            partLocs[p] = new HashSet<>(owners);
+        }
+
+        if (extraSpaces != null) {
+            // Find owner intersections for each participating partitioned cache partition.
+            // We need this for logical collocation between different partitioned caches with the same affinity.
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
 
-            ClusterNode owner = F.rand(owners);
+                if (extraCctx.isReplicated() || extraCctx.isLocal())
+                    continue;
+
+                for (int p = 0, parts =  extraCctx.affinity().partitions(); p < parts; p++) {
+                    List<ClusterNode> owners = extraCctx.topology().owners(p);
 
-            Map<String, Set<Integer>> nodePartsMap = gridPartsMap.get(owner);
+                    if (F.isEmpty(owners))
+                        throw new CacheException("No data nodes found for cache '" + extraSpace +
+                            "' for partition " + p);
 
-            if (nodePartsMap == null) {
-                nodePartsMap = new HashMap<>();
+                    if (partLocs[p] == null)
+                        partLocs[p] = new HashSet<>(owners);
+                    else {
+                        partLocs[p].retainAll(owners); // Intersection of owners.
 
-                gridPartsMap.put(owner, nodePartsMap);
+                        if (partLocs[p].isEmpty())
+                            return null; // Intersection is empty -> retry.
+                    }
+                }
             }
 
-            Set<Integer> parts = nodePartsMap.get(cctx.name());
+            // Filter nodes where not all the replicated caches loaded.
+            for (String extraSpace : extraSpaces) {
+                GridCacheContext<?,?> extraCctx = cacheContext(extraSpace);
 
-            if (parts == null) {
-                parts = new TreeSet<>(); // We need them sorted.
+                if (!extraCctx.isReplicated())
+                    continue;
 
-                nodePartsMap.put(cctx.name(), parts);
+                Set<ClusterNode> dataNodes = new HashSet<>(ctx.discovery().cacheAffinityNodes(extraSpace, NONE));
+
+                if (dataNodes.isEmpty())
+                    throw new CacheException("No data nodes found for cache '" + extraSpace + "'");
+
+                for (int p = 0, extraParts = extraCctx.affinity().partitions(); p < extraParts; p++) {
+                    dataNodes.retainAll(extraCctx.topology().owners(p));
+
+                    if (dataNodes.isEmpty())
+                        throw new CacheException("No data nodes found for cache '" + extraSpace +
+                            "' for partition " + p);
+                }
+
+                for (Set<ClusterNode> partLoc : partLocs) {
+                    partLoc.retainAll(dataNodes);
+
+                    if (partLoc.isEmpty())
+                        return null; // Intersection is empty -> retry.
+                }
             }
+        }
+
+        // Collect the final partitions mapping.
+        Map<ClusterNode, IntArray> res = new HashMap<>();
+
+        // Here partitions in all IntArray's will be sorted in ascending order, this is important.
+        for (int p = 0; p < partLocs.length; p++) {
+            Set<ClusterNode> pl = partLocs[p];
+
+            assert !F.isEmpty(pl) : pl;
+
+            ClusterNode n = pl.size() == 1 ? F.first(pl) : F.rand(pl);
+
+            IntArray parts = res.get(n);
+
+            if (parts == null)
+                res.put(n, parts = new IntArray());
 
             parts.add(p);
         }
+
+        return res;
     }
 
     /**
@@ -580,7 +693,7 @@ public class GridReduceQueryExecutor {
     private void send(
         Collection<ClusterNode> nodes,
         Message msg,
-        Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap
+        Map<ClusterNode,IntArray> gridPartsMap
     ) {
         boolean locNodeFound = false;
 
@@ -595,7 +708,7 @@ public class GridReduceQueryExecutor {
                 ctx.io().send(node, GridTopic.TOPIC_QUERY, copy(msg, node, gridPartsMap), GridIoPolicy.PUBLIC_POOL);
             }
             catch (IgniteCheckedException e) {
-                U.error(log, "Failed to send message to node: " + node, e);
+                U.warn(log, e.getMessage());
             }
         }
 
@@ -609,28 +722,21 @@ public class GridReduceQueryExecutor {
      * @param gridPartsMap Partitions map.
      * @return Copy of message with partitions set.
      */
-    private Message copy(Message msg, ClusterNode node, Map<ClusterNode, Map<String, Set<Integer>>> gridPartsMap) {
+    private Message copy(Message msg, ClusterNode node, Map<ClusterNode,IntArray> gridPartsMap) {
         if (gridPartsMap == null)
             return msg;
 
-        Map<String,Set<Integer>> nodeParts = gridPartsMap.get(node);
-
-        assert nodeParts != null;
+        GridQueryRequest res = new GridQueryRequest((GridQueryRequest)msg);
 
-        GridQueryRequest req = (GridQueryRequest)msg;
+        IntArray parts = gridPartsMap.get(node);
 
-        List<int[]> parts = new ArrayList<>(nodeParts.size());
+        assert parts != null : node;
 
-        parts.add(unbox(nodeParts.get(req.space())));
-
-        if (req.extraSpaces() != null) {
-            for (String extraSpace : req.extraSpaces())
-                parts.add(unbox(nodeParts.get(extraSpace)));
-        }
+        int[] partsArr = new int[parts.size()];
 
-        GridQueryRequest res = new GridQueryRequest(req);
+        parts.toArray(partsArr);
 
-        res.partitions(parts);
+        res.partitions(partsArr);
 
         return res;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d389ada8/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
index 128e148..edba352 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartSelfTest.java
@@ -176,11 +176,19 @@ public class IgniteCacheQueryNodeRestartSelfTest extends GridCacheAbstractSelfTe
 
         Thread.sleep(duration);
 
+        info("Stopping..");
+
         done.set(true);
 
-        fut1.get();
         fut2.get();
 
+        info("Restarts stopped.");
+
+        fut1.get();
+
+        info("Queries stopped.");
+
+
         info("Awaiting rebalance events [restartCnt=" + restartCnt.get() + ']');
 
         boolean success = lsnr.awaitEvents(GRID_CNT * 2 * restartCnt.get(), 15000);