You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/02/15 08:52:16 UTC
[32/50] [abbrv] ignite git commit: ignite-split2 - fixes
ignite-split2 - fixes
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b1a38a9f
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b1a38a9f
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b1a38a9f
Branch: refs/heads/ignite-1232
Commit: b1a38a9ffb36c1e63bbbb9b4bffdb549fd5f064a
Parents: 3cf7587
Author: S.Vladykin <sv...@gridgain.com>
Authored: Thu Dec 17 04:02:44 2015 +0300
Committer: S.Vladykin <sv...@gridgain.com>
Committed: Thu Dec 17 04:02:44 2015 +0300
----------------------------------------------------------------------
.../dht/GridDhtPartitionsReservation.java | 3 +-
.../processors/query/h2/IgniteH2Indexing.java | 25 +++++++--
.../query/h2/opt/GridH2QueryContext.java | 48 ++++++++++++++--
.../query/h2/opt/GridH2TreeIndex.java | 27 ++++++---
.../query/h2/twostep/GridMapQueryExecutor.java | 58 +++++++++++---------
.../h2/twostep/GridReduceQueryExecutor.java | 5 ++
...QueryNodeRestartDistributedJoinSelfTest.java | 34 ++++++------
7 files changed, 137 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
index d12247e..2f51c5a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsReservation.java
@@ -204,7 +204,8 @@ public class GridDhtPartitionsReservation implements GridReservable {
if (reservations.compareAndSet(r, r - 1)) {
// If it was the last reservation and topology version changed -> attempt to evict partitions.
- if (r == 1 && !topVer.equals(cctx.topology().topologyVersion()))
+ if (r == 1 && !cctx.kernalContext().isStopping() &&
+ !topVer.equals(cctx.topology().topologyVersion()))
tryEvict(parts.get());
return;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
index 7b696ff..b16b48e 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/IgniteH2Indexing.java
@@ -208,9 +208,12 @@ public class IgniteH2Indexing implements GridQueryIndexing {
* Command in H2 prepared statement.
*/
static {
- try {
- System.setProperty("h2.objectCache", "false");
+ // Initialize system properties for H2.
+ System.setProperty("h2.objectCache", "false");
+ System.setProperty("h2.serializeJavaObject", "false");
+ System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+ try {
COMMAND_FIELD = JdbcPreparedStatement.class.getDeclaredField("command");
COMMAND_FIELD.setAccessible(true);
@@ -246,6 +249,9 @@ public class IgniteH2Indexing implements GridQueryIndexing {
private GridReduceQueryExecutor rdcQryExec;
/** */
+ private GridSpinBusyLock busyLock;
+
+ /** */
private final ThreadLocal<ConnectionWrapper> connCache = new ThreadLocal<ConnectionWrapper>() {
@Nullable @Override public ConnectionWrapper get() {
ConnectionWrapper c = super.get();
@@ -1438,6 +1444,13 @@ public class IgniteH2Indexing implements GridQueryIndexing {
}
/**
+ * @return Busy lock.
+ */
+ public GridSpinBusyLock busyLock() {
+ return busyLock;
+ }
+
+ /**
* @return Map query executor.
*/
public GridMapQueryExecutor mapQueryExecutor() {
@@ -1457,8 +1470,7 @@ public class IgniteH2Indexing implements GridQueryIndexing {
if (log.isDebugEnabled())
log.debug("Starting cache query index...");
- System.setProperty("h2.serializeJavaObject", "false");
- System.setProperty("h2.objectCacheMaxPerElementSize", "0"); // Avoid ValueJavaObject caching.
+ this.busyLock = busyLock;
if (SysProperties.serializeJavaObject) {
U.warn(log, "Serialization of Java objects in H2 was enabled.");
@@ -1562,7 +1574,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
catch (IgniteCheckedException e) {
ok = false;
- U.warn(log, e.getMessage());
+ U.warn(log, "Failed to send message [node=" + node + ", msg=" + msg +
+ ", errMsg=" + e.getMessage() + "]");
}
}
@@ -1649,6 +1662,8 @@ public class IgniteH2Indexing implements GridQueryIndexing {
U.error(log, "Failed to shutdown database.", e);
}
+ GridH2QueryContext.clearLocalNodeStop(nodeId);
+
if (log.isDebugEnabled())
log.debug("Cache query index stopped.");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
index 2c74f6d..d39d7ab 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2QueryContext.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.processors.query.h2.opt;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
@@ -31,7 +32,6 @@ import org.apache.ignite.spi.indexing.IndexingQueryFilter;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
-import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.LOCAL;
import static org.apache.ignite.internal.processors.query.h2.opt.GridH2QueryType.MAP;
/**
@@ -51,6 +51,9 @@ public class GridH2QueryContext {
@GridToStringInclude
private Map<Long, Object> snapshots;
+ /** */
+ private List<GridReservable> reservations;
+
/** Range streams for indexes. */
private Map<Integer, Object> streams;
@@ -144,6 +147,16 @@ public class GridH2QueryContext {
}
/**
+ * @param reservations Reserved partitions or group reservations.
+ * @return {@code this}.
+ */
+ public GridH2QueryContext reservations(List<GridReservable> reservations) {
+ this.reservations = reservations;
+
+ return this;
+ }
+
+ /**
* @param topVer Topology version.
* @return {@code this}.
*/
@@ -343,7 +356,7 @@ public class GridH2QueryContext {
qctx.remove();
- if (!onlyThreadLoc && x.key.type != LOCAL)
+ if (!onlyThreadLoc && x.key.type == MAP)
doClear(x.key);
}
@@ -361,22 +374,47 @@ public class GridH2QueryContext {
* @param key Context key.
*/
private static void doClear(Key key) {
+ assert key.type == MAP : key.type;
+
GridH2QueryContext x = qctxs.remove(key);
- if (x != null && !F.isEmpty(x.snapshots)) {
+ if (x == null)
+ return;
+
+ assert x.key.equals(key);
+
+ if (!F.isEmpty(x.snapshots)) {
for (Object snapshot : x.snapshots.values()) {
if (snapshot instanceof GridReservable)
((GridReservable)snapshot).release();
}
}
+
+ List<GridReservable> r = x.reservations;
+
+ if (!F.isEmpty(r)) {
+ for (int i = 0; i < r.size(); i++)
+ r.get(i).release();
+ }
}
/**
+ * @param locNodeId Local node ID.
* @param nodeId Dead node ID.
*/
- public static void clearAfterDeadNode(UUID nodeId) {
+ public static void clearAfterDeadNode(UUID locNodeId, UUID nodeId) {
+ for (Key key : qctxs.keySet()) {
+ if (key.locNodeId.equals(locNodeId) && key.nodeId.equals(nodeId))
+ doClear(key);
+ }
+ }
+
+ /**
+ * @param locNodeId Local node ID.
+ */
+ public static void clearLocalNodeStop(UUID locNodeId) {
for (Key key : qctxs.keySet()) {
- if (key.nodeId.equals(nodeId))
+ if (key.locNodeId.equals(locNodeId))
doClear(key);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
index bffe4d0..5641222 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/opt/GridH2TreeIndex.java
@@ -51,6 +51,7 @@ import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2RowRange
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessage;
import org.apache.ignite.internal.processors.query.h2.twostep.msg.GridH2ValueMessageFactory;
import org.apache.ignite.internal.util.GridEmptyIterator;
+import org.apache.ignite.internal.util.GridSpinBusyLock;
import org.apache.ignite.internal.util.offheap.unsafe.GridOffHeapSnapTreeMap;
import org.apache.ignite.internal.util.offheap.unsafe.GridUnsafeGuard;
import org.apache.ignite.internal.util.snaptree.SnapTreeMap;
@@ -208,7 +209,17 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
msgLsnr = new GridMessageListener() {
@Override public void onMessage(UUID nodeId, Object msg) {
- onMessage0(nodeId, msg);
+ GridSpinBusyLock l = desc.indexing().busyLock();
+
+ if (!l.enterBusy())
+ return;
+
+ try {
+ onMessage0(nodeId, msg);
+ }
+ finally {
+ l.leaveBusy();
+ }
}
};
@@ -1397,7 +1408,10 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
private GridH2IndexRangeResponse awaitForResponse() {
assert remainingRanges > 0;
- for (int attempt = 0; attempt < 50; attempt++) {
+ for (int attempt = 0;; attempt++) {
+ if (kernalContext().isStopping())
+ throw new GridH2RetryException("Stopping node.");
+
GridH2IndexRangeResponse res;
try {
@@ -1434,7 +1448,7 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
throw new GridH2RetryException("Failure on remote node.");
try {
- U.sleep(10 * attempt);
+ U.sleep(20 * attempt);
}
catch (IgniteInterruptedCheckedException e) {
throw new IgniteInterruptedException(e.getMessage());
@@ -1442,22 +1456,19 @@ public class GridH2TreeIndex extends GridH2IndexBase implements Comparator<GridS
send(singletonList(node), req);
- break;
+ continue;
case GridH2IndexRangeResponse.STATUS_ERROR:
throw new CacheException(res.error());
default:
- assert false;
+ throw new IllegalStateException();
}
}
if (!kernalContext().discovery().alive(node))
throw new GridH2RetryException("Node left: " + node);
}
-
- // Attempts exceeded.
- throw new CacheException("Failed to get index range from remote node, request timeout.");
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
index 878814a..d8b1180 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridMapQueryExecutor.java
@@ -143,11 +143,13 @@ public class GridMapQueryExecutor {
log = ctx.log(GridMapQueryExecutor.class);
+ final UUID locNodeId = ctx.localNodeId();
+
ctx.event().addLocalEventListener(new GridLocalEventListener() {
@Override public void onEvent(final Event evt) {
UUID nodeId = ((DiscoveryEvent)evt).eventNode().id();
- GridH2QueryContext.clearAfterDeadNode(nodeId);
+ GridH2QueryContext.clearAfterDeadNode(locNodeId, nodeId);
ConcurrentMap<Long,QueryResults> nodeRess = qryRess.remove(nodeId);
@@ -220,14 +222,14 @@ public class GridMapQueryExecutor {
long qryReqId = msg.queryRequestId();
+ GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
+
QueryResults results = nodeRess.remove(qryReqId);
if (results == null)
return;
results.cancel();
-
- GridH2QueryContext.clear(ctx.localNodeId(), node.id(), qryReqId, MAP);
}
/**
@@ -490,7 +492,8 @@ public class GridMapQueryExecutor {
.partitionsMap(partsMap)
.distributedJoins(distributedJoins)
.pageSize(pageSize)
- .topologyVersion(topVer);
+ .topologyVersion(topVer)
+ .reservations(reserved);
List<GridH2Table> snapshotedTbls = null;
@@ -510,6 +513,9 @@ public class GridMapQueryExecutor {
GridH2QueryContext.set(qctx);
+ // qctx is set, we have to release reservations inside of it.
+ reserved = null;
+
h2.enforceJoinOrder(true);
try {
@@ -570,14 +576,8 @@ public class GridMapQueryExecutor {
qr.cancel();
}
- if (X.hasCause(e, GridH2RetryException.class)) {
- try {
- sendRetry(node, reqId);
- }
- catch (IgniteCheckedException ex) {
- U.warn(log, "Failed to send retry message to node: " + node);
- }
- }
+ if (X.hasCause(e, GridH2RetryException.class))
+ sendRetry(node, reqId);
else {
U.error(log, "Failed to execute local query.", e);
@@ -588,9 +588,11 @@ public class GridMapQueryExecutor {
}
}
finally {
- // Release reserved partitions.
- for (GridReservable r : reserved)
- r.release();
+ if (reserved != null) {
+ // Release reserved partitions.
+ for (int i = 0; i < reserved.size(); i++)
+ reserved.get(i).release();
+ }
}
}
@@ -682,22 +684,26 @@ public class GridMapQueryExecutor {
/**
* @param node Node.
* @param reqId Request ID.
- * @throws IgniteCheckedException If failed.
*/
- private void sendRetry(ClusterNode node, long reqId) throws IgniteCheckedException {
- boolean loc = node.isLocal();
+ private void sendRetry(ClusterNode node, long reqId) {
+ try {
+ boolean loc = node.isLocal();
- GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
+ GridQueryNextPageResponse msg = new GridQueryNextPageResponse(reqId,
/*qry*/0, /*page*/0, /*allRows*/0, /*cols*/1,
- loc ? null : Collections.<Message>emptyList(),
- loc ? Collections.<Value[]>emptyList() : null);
+ loc ? null : Collections.<Message>emptyList(),
+ loc ? Collections.<Value[]>emptyList() : null);
- msg.retry(h2.readyTopologyVersion());
+ msg.retry(h2.readyTopologyVersion());
- if (loc)
- h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
- else
- ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ if (loc)
+ h2.reduceQueryExecutor().onMessage(ctx.localNodeId(), msg);
+ else
+ ctx.io().send(node, GridTopic.TOPIC_QUERY, msg, QUERY_POOL);
+ }
+ catch (Exception e) {
+ U.warn(log, "Failed to send retry message: " + e.getMessage());
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
index 00d2f27..6004629 100644
--- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
+++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/twostep/GridReduceQueryExecutor.java
@@ -607,6 +607,9 @@ public class GridReduceQueryExecutor {
final boolean oldStyle = oldNodesInTopology();
final boolean distributedJoins = qry.distributedJoins();
+ if (oldStyle && distributedJoins)
+ throw new CacheException("Failed to enable distributed joins. Topology contains older data nodes.");
+
if (send(nodes,
oldStyle ?
new GridQueryRequest(qryReqId, r.pageSize, space, mapQrys, topVer, extraSpaces, null) :
@@ -704,6 +707,8 @@ public class GridReduceQueryExecutor {
}
if (retry) {
+ send(nodes, new GridQueryCancelRequest(qryReqId), null);
+
if (Thread.currentThread().isInterrupted())
throw new IgniteInterruptedCheckedException("Query was interrupted.");
http://git-wip-us.apache.org/repos/asf/ignite/blob/b1a38a9f/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
index 15f390d..de00a2b 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/IgniteCacheQueryNodeRestartDistributedJoinSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.GridRandom;
import org.apache.ignite.internal.util.typedef.CAX;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -51,16 +52,16 @@ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
*/
public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridCommonAbstractTest {
/** */
- private static final String QRY_0 = "select co.id, count(*) cnt\n" +
+ private static final String QRY_0 = "select co._key, count(*) cnt\n" +
"from \"pe\".Person pe, \"pr\".Product pr, \"co\".Company co, \"pu\".Purchase pu\n" +
- "where pe.id = pu.personId and pu.productId = pr.id and pr.companyId = co.id \n" +
- "group by co.id order by cnt desc, co.id";
+ "where pe._key = pu.personId and pu.productId = pr._key and pr.companyId = co._key \n" +
+ "group by co._key order by cnt desc, co._key";
/** */
- private static final String QRY_1 = "select pr.id, co.id\n" +
+ private static final String QRY_1 = "select pr._key, co._key\n" +
"from \"pr\".Product pr, \"co\".Company co\n" +
- "where pr.companyId = co.id\n" +
- "order by co.id, pr.id ";
+ "where pr.companyId = co._key\n" +
+ "order by co._key, pr._key ";
/** */
private static final int GRID_CNT = 6;
@@ -182,8 +183,8 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
int duration = 90 * 1000;
int qryThreadNum = 4;
int restartThreadsNum = 2; // 4 + 2 = 6 nodes
- final int nodeLifeTime = 2 * 1000;
- final int logFreq = 10;
+ final int nodeLifeTime = 4000;
+ final int logFreq = 1;
startGridsMultiThreaded(GRID_CNT);
@@ -191,6 +192,9 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
fillCaches();
+ X.println("Plan: " + grid(0).cache("pu").query(new SqlFieldsQuery("explain " + QRY_0)
+ .setDistributedJoins(true)).getAll());
+
final List<List<?>> pRes = grid(0).cache("pu").query(new SqlFieldsQuery(QRY_0)
.setDistributedJoins(true)).getAll();
@@ -206,7 +210,6 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
assertFalse(rRes.isEmpty());
final AtomicInteger qryCnt = new AtomicInteger();
-
final AtomicBoolean qrysDone = new AtomicBoolean();
IgniteInternalFuture<?> fut1 = multithreadedAsync(new CAX() {
@@ -221,15 +224,14 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
}
while (!locks.compareAndSet(g, 0, 1));
- if (rnd.nextBoolean()) { // Partitioned query.
+ if (rnd.nextBoolean()) {
IgniteCache<?,?> cache = grid(g).cache("pu");
SqlFieldsQuery qry = new SqlFieldsQuery(QRY_0).setDistributedJoins(true);
boolean smallPageSize = rnd.nextBoolean();
- if (smallPageSize)
- qry.setPageSize(3);
+ qry.setPageSize(smallPageSize ? 30 : 1000);
try {
assertEquals(pRes, cache.query(qry).getAll());
@@ -321,16 +323,12 @@ public class IgniteCacheQueryNodeRestartDistributedJoinSelfTest extends GridComm
info("Stopping..");
restartsDone.set(true);
-
- fut2.get();
-
- info("Restarts stopped.");
-
qrysDone.set(true);
+ fut2.get();
fut1.get();
- info("Queries stopped.");
+ info("Stopped.");
}
/** {@inheritDoc} */