You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:16 UTC

[32/50] [abbrv] ignite git commit: ignite-split2 - fixes

ignite-split2 - fixes


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1a38a9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1a38a9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1a38a9f

Branch: refs/heads/ignite-1232
Commit: b1a38a9ffb36c1e63bbbb9b4bffdb549fd5f064a
Parents: 3cf7587
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Dec 17 04:02:44 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Dec 17 04:02:44 2015 +0300

----------------------------------------------------------------------
 .../dht/GridDhtPartitionsReservation.java       |  3 +-
 .../processors/query/h2/IgniteH2Indexing.java   | 25 +++++++--
 .../query/h2/opt/GridH2QueryContext.java        | 48 ++++++++++++++--
 .../query/h2/opt/GridH2TreeIndex.java           | 27 ++++++---
 .../query/h2/twostep/GridMapQueryExecutor.java  | 58 +++++++++++---------
 .../h2/twostep/GridReduceQueryExecutor.java     |  5 ++
 ...QueryNodeRestartDistributedJoinSelfTest.java | 34 ++++++------
 7 files changed, 137 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index d12247e..2f51c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -204,7 +204,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
 
             if (reservations.compareAndSet(r, r - 1)) {
                 // If it was the last reservation and topology version changed -> attempt to evict partitions.
-                if (r == 1 && !topVer.equals(cctx.topology().topologyVersion()))
+                if (r == 1 && !cctx.kernalContext().isStopping() &&
+                    !topVer.equals(cctx.topology().topologyVersion()))
                     tryEvict(parts.get());
 
                 return;

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7b696ff..b16b48e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -208,9 +208,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
      * Command in H2 prepared statement.
      */
     static {
-        try {
-            System.setProperty("h2.objectCache", "false");
+        // Initialize system properties for H2.
+        System.setProperty("h2.objectCache", "false");
+        System.setProperty("h2.serializeJavaObject", "false");
+        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
 
+        try {
             COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command");
 
             COMMAND_FIELD.setAccessible(true);
@@ -246,6 +249,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     private GridReduceQueryExecutor rdcQryExec;
 
     /** */
+    private GridSpinBusyLock busyLock;
+
+    /** */
     private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
         @Nullable @Override public ConnectionWrapper get() {
             ConnectionWrapper c = super.get();
@@ -1438,6 +1444,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
     }
 
     /**
+     * @return Busy lock.
+     */
+    public GridSpinBusyLock busyLock() {
+        return busyLock;
+    }
+
+    /**
      * @return Map query executor.
      */
     public GridMapQueryExecutor mapQueryExecutor() {
@@ -1457,8 +1470,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
         if (log.isDebugEnabled())
             log.debug("Starting cache query index...");
 
-        System.setProperty("h2.serializeJavaObject", "false");
-        System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+        this.busyLock = busyLock;
 
         if (SysProperties.serializeJavaObject) {
             U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1562,7 +1574,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             catch (IgniteCheckedException e) {
                 ok = false;
 
-                U.warn(log, e.getMessage());
+                U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg +
+                    ", errMsg=" + e.getMessage() + "]");
             }
         }
 
@@ -1649,6 +1662,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
             U.error(log, "Failed to shutdown database.", e);
         }
 
+        GridH2QueryContext.clearLocalNodeStop(nodeId);
+
         if (log.isDebugEnabled())
             log.debug("Cache query index stopped.");
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 2c74f6d..d39d7ab 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.processors.query.h2.opt;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentMap;
@@ -31,7 +32,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
 import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
 
 /**
@@ -51,6 +51,9 @@ public class GridH2QueryContext {
     @GridToStringInclude
     private Map<Long, Object> snapshots;
 
+    /** */
+    private List<GridReservable> reservations;
+
     /** Range streams for indexes. */
     private Map<Integer, Object> streams;
 
@@ -144,6 +147,16 @@ public class GridH2QueryContext {
     }
 
     /**
+     * @param reservations Reserved partitions or group reservations.
+     * @return {@code this}.
+     */
+    public GridH2QueryContext reservations(List<GridReservable> reservations) {
+        this.reservations = reservations;
+
+        return this;
+    }
+
+    /**
      * @param topVer Topology version.
      * @return {@code this}.
      */
@@ -343,7 +356,7 @@ public class GridH2QueryContext {
 
         qctx.remove();
 
-        if (!onlyThreadLoc && x.key.type != LOCAL)
+        if (!onlyThreadLoc && x.key.type == MAP)
             doClear(x.key);
     }
 
@@ -361,22 +374,47 @@ public class GridH2QueryContext {
      * @param key Context key.
      */
     private static void doClear(Key key) {
+        assert key.type == MAP : key.type;
+
         GridH2QueryContext x = qctxs.remove(key);
 
-        if (x != null && !F.isEmpty(x.snapshots)) {
+        if (x == null)
+            return;
+
+        assert x.key.equals(key);
+
+        if (!F.isEmpty(x.snapshots)) {
             for (Object snapshot : x.snapshots.values()) {
                 if (snapshot instanceof GridReservable)
                     ((GridReservable)snapshot).release();
             }
         }
+
+        List<GridReservable> r = x.reservations;
+
+        if (!F.isEmpty(r)) {
+            for (int i = 0; i < r.size(); i++)
+                r.get(i).release();
+        }
     }
 
     /**
+     * @param locNodeId Local node ID.
      * @param nodeId Dead node ID.
      */
-    public static void clearAfterDeadNode(UUID nodeId) {
+    public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) {
+        for (Key key : qctxs.keySet()) {
+            if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId))
+                doClear(key);
+        }
+    }
+
+    /**
+     * @param locNodeId Local node ID.
+     */
+    public static void clearLocalNodeStop(UUID locNodeId) {
         for (Key key : qctxs.keySet()) {
-            if (key.nodeId.equals(nodeId))
+            if (key.locNodeId.equals(locNodeId))
                 doClear(key);
         }
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index bffe4d0..5641222 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
 import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
 import org.apache.ignite.internal.util.GridEmptyIterator;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
 import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
 import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
 import org.apache.ignite.internal.util.snaptree.SnapTreeMap;
@@ -208,7 +209,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
             msgLsnr = new GridMessageListener() {
                 @Override public void onMessage(UUID nodeId, Object msg) {
-                    onMessage0(nodeId, msg);
+                    GridSpinBusyLock l = desc.indexing().busyLock();
+
+                    if (!l.enterBusy())
+                        return;
+
+                    try {
+                        onMessage0(nodeId, msg);
+                    }
+                    finally {
+                        l.leaveBusy();
+                    }
                 }
             };
 
@@ -1397,7 +1408,10 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
         private GridH2IndexRangeResponse awaitForResponse() {
             assert remainingRanges > 0;
 
-            for (int attempt = 0; attempt < 50; attempt++) {
+            for (int attempt = 0;; attempt++) {
+                if (kernalContext().isStopping())
+                    throw new GridH2RetryException("Stopping node.");
+
                 GridH2IndexRangeResponse res;
 
                 try {
@@ -1434,7 +1448,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
                                 throw new GridH2RetryException("Failure on remote node.");
 
                             try {
-                                U.sleep(10 * attempt);
+                                U.sleep(20 * attempt);
                             }
                             catch (IgniteInterruptedCheckedException e) {
                                 throw new IgniteInterruptedException(e.getMessage());
@@ -1442,22 +1456,19 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
 
                             send(singletonList(node), req);
 
-                            break;
+                            continue;
 
                         case GridH2IndexRangeResponse.STATUS_ERROR:
                             throw new CacheException(res.error());
 
                         default:
-                            assert false;
+                            throw new IllegalStateException();
                     }
                 }
 
                 if (!kernalContext().discovery().alive(node))
                     throw new GridH2RetryException("Node left: " + node);
             }
-
-            // Attempts exceeded.
-            throw new CacheException("Failed to get index range from remote node, request timeout.");
         }
 
         /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 878814a..d8b1180 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -143,11 +143,13 @@ public class GridMapQueryExecutor {
 
         log = ctx.log(GridMapQueryExecutor.class);
 
+        final UUID locNodeId = ctx.localNodeId();
+
         ctx.event().addLocalEventListener(new GridLocalEventListener() {
             @Override public void onEvent(final Event evt) {
                 UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
 
-                GridH2QueryContext.clearAfterDeadNode(nodeId);
+                GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
 
                 ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId);
 
@@ -220,14 +222,14 @@ public class GridMapQueryExecutor {
 
         long qryReqId = msg.queryRequestId();
 
+        GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
+
         QueryResults results = nodeRess.remove(qryReqId);
 
         if (results == null)
             return;
 
         results.cancel();
-
-        GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
     }
 
     /**
@@ -490,7 +492,8 @@ public class GridMapQueryExecutor {
                 .partitionsMap(partsMap)
                 .distributedJoins(distributedJoins)
                 .pageSize(pageSize)
-                .topologyVersion(topVer);
+                .topologyVersion(topVer)
+                .reservations(reserved);
 
             List<GridH2Table> snapshotedTbls = null;
 
@@ -510,6 +513,9 @@ public class GridMapQueryExecutor {
 
             GridH2QueryContext.set(qctx);
 
+            // qctx is set, we have to release reservations inside of it.
+            reserved = null;
+
             h2.enforceJoinOrder(true);
 
             try {
@@ -570,14 +576,8 @@ public class GridMapQueryExecutor {
                 qr.cancel();
             }
 
-            if (X.hasCause(e, GridH2RetryException.class)) {
-                try {
-                    sendRetry(node, reqId);
-                }
-                catch (IgniteCheckedException ex) {
-                    U.warn(log, "Failed to send retry message to node: " + node);
-                }
-            }
+            if (X.hasCause(e, GridH2RetryException.class))
+                sendRetry(node, reqId);
             else {
                 U.error(log, "Failed to execute local query.", e);
 
@@ -588,9 +588,11 @@ public class GridMapQueryExecutor {
             }
         }
         finally {
-            // Release reserved partitions.
-            for (GridReservable r : reserved)
-                r.release();
+            if (reserved != null) {
+                // Release reserved partitions.
+                for (int i = 0; i < reserved.size(); i++)
+                    reserved.get(i).release();
+            }
         }
     }
 
@@ -682,22 +684,26 @@ public class GridMapQueryExecutor {
     /**
      * @param node Node.
      * @param reqId Request ID.
-     * @throws IgniteCheckedException If failed.
      */
-    private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException {
-        boolean loc = node.isLocal();
+    private void sendRetry(ClusterNode node, long reqId) {
+        try {
+            boolean loc = node.isLocal();
 
-        GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+            GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
             /*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
-            loc ? null : Collections.<Message>emptyList(),
-            loc ? Collections.<Value[]>emptyList() : null);
+                loc ? null : Collections.<Message>emptyList(),
+                loc ? Collections.<Value[]>emptyList() : null);
 
-        msg.retry(h2.readyTopologyVersion());
+            msg.retry(h2.readyTopologyVersion());
 
-        if (loc)
-            h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
-        else
-            ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+            if (loc)
+                h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+            else
+                ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+        }
+        catch (Exception e) {
+            U.warn(log, "Failed to send retry message: " + e.getMessage());
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 00d2f27..6004629 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -607,6 +607,9 @@ public class GridReduceQueryExecutor {
                 final boolean oldStyle = oldNodesInTopology();
                 final boolean distributedJoins = qry.distributedJoins();
 
+                if (oldStyle && distributedJoins)
+                    throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
+
                 if (send(nodes,
                     oldStyle ?
                         new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null) :
@@ -704,6 +707,8 @@ public class GridReduceQueryExecutor {
                 }
 
                 if (retry) {
+                    send(nodes, new GridQueryCancelRequest(qryReqId), null);
+
                     if (Thread.currentThread().isInterrupted())
                         throw new IgniteInterruptedCheckedException("Query was interrupted.");
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index 15f390d..de00a2b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.util.GridRandom;
 import org.apache.ignite.internal.util.typedef.CAX;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -51,16 +52,16 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
  */
 public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridCommonAbstractTest {
     /** */
-    private static final String QRY_0 = "select co.id, count(*) cnt\n" +
+    private static final String QRY_0 = "select co._key, count(*) cnt\n" +
         "from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
-        "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" +
-        "group by co.id order by cnt desc, co.id";
+        "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+        "group by co._key order by cnt desc, co._key";
 
     /** */
-    private static final String QRY_1 = "select pr.id, co.id\n" +
+    private static final String QRY_1 = "select pr._key, co._key\n" +
         "from \"pr\".Product pr, \"co\".Company co\n" +
-        "where pr.companyId = co.id\n" +
-        "order by co.id, pr.id ";
+        "where pr.companyId = co._key\n" +
+        "order by co._key, pr._key ";
 
     /** */
     private static final int GRID_CNT = 6;
@@ -182,8 +183,8 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
         int duration = 90 * 1000;
         int qryThreadNum = 4;
         int restartThreadsNum = 2; // 4 + 2 = 6 nodes
-        final int nodeLifeTime = 2 * 1000;
-        final int logFreq = 10;
+        final int nodeLifeTime = 4000;
+        final int logFreq = 1;
 
         startGridsMultiThreaded(GRID_CNT);
 
@@ -191,6 +192,9 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
 
         fillCaches();
 
+        X.println("Plan: " + grid(0).cache("pu").query(new SqlFieldsQuery("explain " + QRY_0)
+            .setDistributedJoins(true)).getAll());
+
         final List<List<?>> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(QRY_0)
             .setDistributedJoins(true)).getAll();
 
@@ -206,7 +210,6 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
         assertFalse(rRes.isEmpty());
 
         final AtomicInteger qryCnt = new AtomicInteger();
-
         final AtomicBoolean qrysDone = new AtomicBoolean();
 
         IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
@@ -221,15 +224,14 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
                     }
                     while (!locks.compareAndSet(g, 0, 1));
 
-                    if (rnd.nextBoolean()) { // Partitioned query.
+                    if (rnd.nextBoolean()) {
                         IgniteCache<?,?> cache = grid(g).cache("pu");
 
                         SqlFieldsQuery qry = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
 
                         boolean smallPageSize = rnd.nextBoolean();
 
-                        if (smallPageSize)
-                            qry.setPageSize(3);
+                        qry.setPageSize(smallPageSize ? 30 : 1000);
 
                         try {
                             assertEquals(pRes, cache.query(qry).getAll());
@@ -321,16 +323,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
         info("Stopping..");
 
         restartsDone.set(true);
-
-        fut2.get();
-
-        info("Restarts stopped.");
-
         qrysDone.set(true);
 
+        fut2.get();
         fut1.get();
 
-        info("Queries stopped.");
+        info("Stopped.");
     }
 
     /** {@inheritDoc} */