You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/09/16 07:29:16 UTC
ignite git commit: IGNITE-1239 - Fixed scan query failover on
changing topology.
Repository: ignite
Updated Branches:
refs/heads/ignite-1239 [created] 471b75e70
IGNITE-1239 - Fixed scan query failover on changing topology.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/471b75e7
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/471b75e7
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/471b75e7
Branch: refs/heads/ignite-1239
Commit: 471b75e70398c7db7e621e7afa91d2c62fbc258d
Parents: d928ef4
Author: Alexey Goncharuk <al...@gmail.com>
Authored: Tue Sep 15 22:29:11 2015 -0700
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Tue Sep 15 22:29:11 2015 -0700
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 45 ++++---
.../query/GridCacheDistributedQueryFuture.java | 25 +++-
.../cache/query/GridCacheLocalQueryFuture.java | 5 +
.../cache/query/GridCacheQueryAdapter.java | 116 +++++++++++++++----
.../query/GridCacheQueryFutureAdapter.java | 11 +-
.../cache/query/GridCacheQueryManager.java | 11 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 42 ++++---
7 files changed, 183 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 1fc94ec..4c24a51 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -745,16 +745,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
int part = ctx.affinity().partition(cacheKey);
- boolean nearKey;
+ boolean dhtKey = false;
- if (!(modes.near && modes.primary && modes.backup)) {
+ if (modes.primary || modes.backup) {
boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
if (keyPrimary) {
if (!modes.primary)
return null;
- nearKey = false;
+ dhtKey = true;
}
else {
boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer);
@@ -763,36 +763,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!modes.backup)
return null;
- nearKey = false;
- }
- else {
- if (!modes.near)
- return null;
-
- nearKey = true;
-
- // Swap and offheap are disabled for near cache.
- modes.offheap = false;
- modes.swap = false;
+ dhtKey = true;
}
}
- }
- else {
- nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
- if (nearKey) {
- // Swap and offheap are disabled for near cache.
- modes.offheap = false;
- modes.swap = false;
- }
+ // We will always peek DHT entry if both primary and backup flags are set, regardless of
+ // affinity calculation.
+ // This is required because there are scenarios when neither primary nor backup node is an owner,
+ // but we need to be able to peek cache value.
+ dhtKey |= (modes.primary && modes.backup);
}
- if (nearKey && !ctx.isNear())
+ if (!dhtKey && !ctx.isNear())
return null;
if (modes.heap) {
- GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
- (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
+ GridCacheEntryEx e = (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
if (e != null) {
cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc);
@@ -800,6 +786,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
modes.offheap = false;
modes.swap = false;
}
+
+ if (cacheVal == null && modes.near && ctx.isNear()) {
+ e = peekEx(cacheKey);
+
+ cacheVal = e.peek(modes.heap, false, false, topVer, plc);
+
+ modes.offheap = false;
+ modes.swap = false;
+ }
}
if (modes.offheap || modes.swap) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 1d547c5..06d0625 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
@@ -144,7 +145,24 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
if (callOnPage)
// We consider node departure as a reception of last empty
// page from this node.
- onPage(nodeId, Collections.emptyList(), null, true);
+ onPage(nodeId, Collections.emptyList(),
+ new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void awaitFirstPage() throws IgniteCheckedException {
+ try {
+ firstPageLatch.await();
+
+ if (isDone() && error() != null)
+ // Throw the exception if future failed.
+ get();
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteInterruptedCheckedException(e);
+ }
}
/** {@inheritDoc} */
@@ -229,9 +247,12 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
/** {@inheritDoc} */
@Override public boolean onDone(Collection<R> res, Throwable err) {
+ boolean done = super.onDone(res, err);
+
+ // Must release the lath after onDone() in order for a waiting thread to see an exception, if any.
firstPageLatch.countDown();
- return super.onDone(res, err);
+ return done;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
index 46af18a..248dfa8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheLocalQueryFuture.java
@@ -77,6 +77,11 @@ public class GridCacheLocalQueryFuture<K, V, R> extends GridCacheQueryFutureAdap
// No-op.
}
+ /** {@inheritDoc} */
+ @Override public void awaitFirstPage() throws IgniteCheckedException {
+ get();
+ }
+
/** */
private class LocalQueryRunnable implements GridPlainRunnable {
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 3ac5746..9b1fb7e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.query;
import java.util.Collection;
import java.util.Collections;
+import java.util.Deque;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Queue;
@@ -34,6 +35,7 @@ import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
+import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtUnreservedPartitionException;
@@ -41,6 +43,7 @@ import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -63,13 +66,6 @@ import static org.apache.ignite.internal.processors.cache.query.GridCacheQueryTy
* Query adapter.
*/
public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
- /** Is local node predicate. */
- private static final IgnitePredicate<ClusterNode> IS_LOC_NODE = new IgnitePredicate<ClusterNode>() {
- @Override public boolean apply(ClusterNode n) {
- return n.isLocal();
- }
- };
-
/** */
private final GridCacheContext<?, ?> cctx;
@@ -446,7 +442,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
cctx.checkSecurity(SecurityPermission.CACHE_READ);
- if (nodes.isEmpty())
+ if (nodes.isEmpty() && (type != SCAN || part == null))
return new GridCacheQueryErrorFuture<>(cctx.kernalContext(), new ClusterGroupEmptyCheckedException());
if (log.isDebugEnabled())
@@ -477,8 +473,8 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
if (type == SQL_FIELDS || type == SPI)
return (CacheQueryFuture<R>)(loc ? qryMgr.queryFieldsLocal(bean) :
qryMgr.queryFieldsDistributed(bean, nodes));
- else if (type == SCAN && part != null && nodes.size() > 1)
- return new CacheQueryFallbackFuture<>(nodes, part, bean, qryMgr, cctx);
+ else if (type == SCAN && part != null)
+ return new CacheQueryFallbackFuture<>(part, bean, qryMgr, cctx);
else
return (CacheQueryFuture<R>)(loc ? qryMgr.queryLocal(bean) : qryMgr.queryDistributed(bean, nodes));
}
@@ -581,37 +577,47 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** Partition. */
private final int part;
+ /** Flag indicating that a first item has been returned to a user. */
+ private boolean firstItemReturned;
+
/**
- * @param nodes Backups.
* @param part Partition.
* @param bean Bean.
* @param qryMgr Query manager.
* @param cctx Cache context.
*/
- public CacheQueryFallbackFuture(Collection<ClusterNode> nodes, int part, GridCacheQueryBean bean,
+ private CacheQueryFallbackFuture(int part, GridCacheQueryBean bean,
GridCacheQueryManager qryMgr, GridCacheContext cctx) {
- this.nodes = fallbacks(nodes);
this.bean = bean;
this.qryMgr = qryMgr;
this.cctx = cctx;
this.part = part;
+ nodes = fallbacks(cctx.discovery().topologyVersionEx());
+
init();
}
/**
- * @param nodes Nodes.
* @return Nodes for query execution.
*/
- private Queue<ClusterNode> fallbacks(Collection<ClusterNode> nodes) {
- Queue<ClusterNode> fallbacks = new LinkedList<>();
+ private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
+ Deque<ClusterNode> fallbacks = new LinkedList<>();
+ Collection<ClusterNode> owners = new HashSet<>();
- ClusterNode node = F.first(F.view(nodes, IS_LOC_NODE));
+ for (ClusterNode node : cctx.topology().owners(part, topVer)) {
+ if (node.isLocal())
+ fallbacks.addFirst(node);
+ else
+ fallbacks.add(node);
- if (node != null)
- fallbacks.add(node);
+ owners.add(node);
+ }
- fallbacks.addAll(node != null ? F.view(nodes, F.not(IS_LOC_NODE)) : nodes);
+ for (ClusterNode node : cctx.topology().moving(part)) {
+ if (!owners.contains(node))
+ fallbacks.add(node);
+ }
return fallbacks;
}
@@ -651,7 +657,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
@Override public void apply(
IgniteInternalFuture<AffinityTopologyVersion> future) {
- nodes = fallbacks(cctx.topology().owners(part, topVer));
+ nodes = fallbacks(topVer);
// Race is impossible here because query retries are executed one by one.
unreservedTopVer = null;
@@ -684,7 +690,73 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
/** {@inheritDoc} */
@Override public R next() {
- return fut.next();
+ if (firstItemReturned)
+ return fut.next();
+
+ while (true) {
+ try {
+ fut.awaitFirstPage();
+
+ firstItemReturned = true;
+
+ return fut.next();
+ }
+ catch (IgniteClientDisconnectedCheckedException e) {
+ throw U.convertException(e);
+ }
+ catch (IgniteCheckedException e) {
+ retryIfPossible(e);
+ }
+ }
+ }
+
+ /**
+ * @param e Exception for query run.
+ */
+ private void retryIfPossible(IgniteCheckedException e) {
+ try {
+ IgniteInternalFuture<?> retryFut;
+
+ if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
+ AffinityTopologyVersion waitVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
+
+ assert waitVer != null;
+
+ retryFut = cctx.affinity().affinityReadyFuture(waitVer);
+ }
+ else if (e.hasCause(ClusterTopologyCheckedException.class)) {
+ ClusterTopologyCheckedException topEx = X.cause(e, ClusterTopologyCheckedException.class);
+
+ retryFut = topEx.retryReadyFuture();
+ }
+ else if (e.hasCause(ClusterGroupEmptyCheckedException.class)) {
+ ClusterGroupEmptyCheckedException ex = X.cause(e, ClusterGroupEmptyCheckedException.class);
+
+ retryFut = ex.retryReadyFuture();
+ }
+ else
+ throw U.convertException(e);
+
+ if (F.isEmpty(nodes)) {
+ if (--unreservedNodesRetryCnt > 0) {
+ if (retryFut != null)
+ retryFut.get();
+
+ nodes = fallbacks(unreservedTopVer == null ? cctx.discovery().topologyVersionEx() : unreservedTopVer);
+
+ unreservedTopVer = null;
+
+ init();
+ }
+ else
+ throw U.convertException(e);
+ }
+ else
+ init();
+ }
+ catch (IgniteCheckedException ex) {
+ throw U.convertException(ex);
+ }
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
index ad9ee39..2a4fbda 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryFutureAdapter.java
@@ -183,6 +183,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
}
/**
+ * Waits for the first page to be received from remote node(s), if any.
+ *
+ * @throws IgniteCheckedException If query execution failed with an error.
+ */
+ public abstract void awaitFirstPage() throws IgniteCheckedException;
+
+ /**
* Returns next page for the query.
*
* @return Next page or {@code null} if no more pages available.
@@ -380,13 +387,13 @@ public abstract class GridCacheQueryFutureAdapter<K, V, R> extends GridFutureAda
synchronized (mux) {
enqueue(Collections.emptyList());
- onPage(nodeId, true);
-
onDone(nodeId != null ?
new IgniteCheckedException("Failed to execute query on node [query=" + qry +
", nodeId=" + nodeId + "]", err) :
new IgniteCheckedException("Failed to execute query locally: " + qry, err));
+ onPage(nodeId, true);
+
mux.notifyAll();
}
else {
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index 1d934d8..a7b55eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -1557,7 +1557,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
}
}
catch (Throwable e) {
- U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
+ if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+ U.error(log, "Failed to run query [qry=" + qryInfo + ", node=" + cctx.nodeId() + "]", e);
onPageReady(loc, qryInfo, null, true, e);
@@ -1572,8 +1573,9 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
res.closeIfNotShared(recipient(qryInfo.senderId(), qryInfo.requestId()));
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
- cctx.nodeId() + "]", e);
+ if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+ U.error(log, "Failed to close local iterator [qry=" + qryInfo + ", node=" +
+ cctx.nodeId() + "]", e);
}
}
}
@@ -1694,7 +1696,8 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
fut.get().closeIfNotShared(recipient(sndId, reqId));
}
catch (IgniteCheckedException e) {
- U.error(log, "Failed to close iterator.", e);
+ if (!X.hasCause(e, GridDhtUnreservedPartitionException.class))
+ U.error(log, "Failed to close iterator.", e);
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/471b75e7/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index 1ef470a..c087ffc 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.cache.query.ScanQuery;
@@ -44,14 +45,12 @@ import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cluster.ClusterGroupEmptyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryAdapter;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -103,6 +102,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/** Test entries. */
private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
+ /** */
+ private boolean syncRebalance;
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(gridName);
@@ -120,6 +122,10 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
ccfg.setCacheMode(cacheMode);
ccfg.setAtomicityMode(CacheAtomicityMode.ATOMIC);
ccfg.setBackups(backups);
+
+ if (syncRebalance)
+ ccfg.setRebalanceMode(CacheRebalanceMode.SYNC);
+
ccfg.setNearConfiguration(null);
cfg.setCacheConfiguration(ccfg);
@@ -192,8 +198,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
public void testScanFallbackOnRebalancing() throws Exception {
cacheMode = CacheMode.PARTITIONED;
clientMode = false;
- backups = 1;
+ backups = 2;
commSpiFactory = new TestFallbackOnRebalancingCommunicationSpiFactory();
+ syncRebalance = true;
try {
Ignite ignite = startGrids(GRID_CNT);
@@ -214,6 +221,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
Thread.sleep(3000);
+ info("Will stop grid: " + getTestGridName(id));
+
stopGrid(id);
if (done.get())
@@ -224,7 +233,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
return null;
}
- }, GRID_CNT);
+ }, 2);
final AtomicInteger nodeIdx = new AtomicInteger();
@@ -233,18 +242,18 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
@Override public Object call() throws Exception {
int nodeId = nodeIdx.getAndIncrement();
- IgniteCacheProxy<Integer, Integer> cache = (IgniteCacheProxy<Integer, Integer>)
- grid(nodeId).<Integer, Integer>cache(null);
+ IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
while (!done.get()) {
- IgniteBiTuple<Integer, UUID> tup = remotePartition(cache.context());
+ int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
- int part = tup.get1();
+ info("Running query [node=" + nodeId + ", part=" + part + ']');
- CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(
- null, part, false);
+ try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+ cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
- doTestScanQuery(qry, part);
+ doTestScanQueryCursor(cur, part);
+ }
}
return null;
@@ -270,8 +279,6 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
* @throws Exception In case of error.
*/
public void testScanFallbackOnRebalancingCursor() throws Exception {
- fail("https://issues.apache.org/jira/browse/IGNITE-1239");
-
cacheMode = CacheMode.PARTITIONED;
clientMode = false;
backups = 1;
@@ -311,12 +318,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
while (!done.get()) {
int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
- QueryCursor<Cache.Entry<Integer, Integer>> cur =
- cache.query(new ScanQuery<Integer, Integer>(part));
+ info("Running query [node=" + nodeId + ", part=" + part + ']');
- U.debug(log, "Running query [node=" + nodeId + ", part=" + part + ']');
+ try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+ cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
- doTestScanQueryCursor(cur, part);
+ doTestScanQueryCursor(cur, part);
+ }
}
return null;