You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/09/16 11:27:04 UTC
ignite git commit: ignite-1239 Reverted changes in localPeek,
instead in scan iterator use 'cache.peekEx'. Removed in
CacheQueryFallbackFuture internal future listener assuming
CacheQueryFallbackFuture.get is never called.
Repository: ignite
Updated Branches:
refs/heads/ignite-1239 471b75e70 -> f39577a25
ignite-1239 Reverted changes in localPeek, instead in scan iterator use 'cache.peekEx'. Removed in CacheQueryFallbackFuture internal future listener assuming CacheQueryFallbackFuture.get is never called.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f39577a2
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f39577a2
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f39577a2
Branch: refs/heads/ignite-1239
Commit: f39577a257e74d5ef6858a31ce6f75842f264598
Parents: 471b75e
Author: sboikov <sb...@gridgain.com>
Authored: Wed Sep 16 12:16:49 2015 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Wed Sep 16 12:23:48 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 45 +++--
.../query/GridCacheDistributedQueryFuture.java | 2 -
.../cache/query/GridCacheQueryAdapter.java | 56 +-----
.../cache/query/GridCacheQueryManager.java | 19 +-
...CacheScanPartitionQueryFallbackSelfTest.java | 192 +++++--------------
5 files changed, 92 insertions(+), 222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4c24a51..1fc94ec 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -745,16 +745,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
int part = ctx.affinity().partition(cacheKey);
- boolean dhtKey = false;
+ boolean nearKey;
- if (modes.primary || modes.backup) {
+ if (!(modes.near && modes.primary && modes.backup)) {
boolean keyPrimary = ctx.affinity().primary(ctx.localNode(), part, topVer);
if (keyPrimary) {
if (!modes.primary)
return null;
- dhtKey = true;
+ nearKey = false;
}
else {
boolean keyBackup = ctx.affinity().belongs(ctx.localNode(), part, topVer);
@@ -763,22 +763,36 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!modes.backup)
return null;
- dhtKey = true;
+ nearKey = false;
+ }
+ else {
+ if (!modes.near)
+ return null;
+
+ nearKey = true;
+
+ // Swap and offheap are disabled for near cache.
+ modes.offheap = false;
+ modes.swap = false;
}
}
+ }
+ else {
+ nearKey = !ctx.affinity().belongs(ctx.localNode(), part, topVer);
- // We will always peek DHT entry if both primary and backup flags are set, regardless of
- // affinity calculation.
- // This is required because there are scenarios when neither primary nor backup node is an owner,
- // but we need to be able to peek cache value.
- dhtKey |= (modes.primary && modes.backup);
+ if (nearKey) {
+ // Swap and offheap are disabled for near cache.
+ modes.offheap = false;
+ modes.swap = false;
+ }
}
- if (!dhtKey && !ctx.isNear())
+ if (nearKey && !ctx.isNear())
return null;
if (modes.heap) {
- GridCacheEntryEx e = (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
+ GridCacheEntryEx e = nearKey ? peekEx(cacheKey) :
+ (ctx.isNear() ? ctx.near().dht().peekEx(cacheKey) : peekEx(cacheKey));
if (e != null) {
cacheVal = e.peek(modes.heap, modes.offheap, modes.swap, topVer, plc);
@@ -786,15 +800,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
modes.offheap = false;
modes.swap = false;
}
-
- if (cacheVal == null && modes.near && ctx.isNear()) {
- e = peekEx(cacheKey);
-
- cacheVal = e.peek(modes.heap, false, false, topVer, plc);
-
- modes.offheap = false;
- modes.swap = false;
- }
}
if (modes.offheap || modes.swap) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
index 06d0625..e745e30 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheDistributedQueryFuture.java
@@ -143,8 +143,6 @@ public class GridCacheDistributedQueryFuture<K, V, R> extends GridCacheQueryFutu
}
if (callOnPage)
- // We consider node departure as a reception of last empty
- // page from this node.
onPage(nodeId, Collections.emptyList(),
new ClusterTopologyCheckedException("Remote node has left topology: " + nodeId), true);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
index 9b1fb7e..db67bbb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryAdapter.java
@@ -50,8 +50,6 @@ import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
-import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteReducer;
import org.apache.ignite.plugin.security.SecurityPermission;
import org.jetbrains.annotations.Nullable;
@@ -599,6 +597,7 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/**
+ * @param topVer Topology version.
* @return Nodes for query execution.
*/
private Queue<ClusterNode> fallbacks(AffinityTopologyVersion topVer) {
@@ -629,53 +628,9 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
private void init() {
final ClusterNode node = nodes.poll();
- GridCacheQueryFutureAdapter<?, ?, R> fut0 = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
+ fut = (GridCacheQueryFutureAdapter<?, ?, R>)(node.isLocal() ?
qryMgr.queryLocal(bean) :
qryMgr.queryDistributed(bean, Collections.singleton(node)));
-
- fut0.listen(new IgniteInClosure<IgniteInternalFuture<Collection<R>>>() {
- @Override public void apply(IgniteInternalFuture<Collection<R>> fut) {
- try {
- onDone(fut.get());
- }
- catch (IgniteClientDisconnectedCheckedException e) {
- onDone(e);
- }
- catch (IgniteCheckedException e) {
- if (e.hasCause(GridDhtUnreservedPartitionException.class)) {
- unreservedTopVer = ((GridDhtUnreservedPartitionException)e.getCause()).topologyVersion();
-
- assert unreservedTopVer != null;
- }
-
- if (F.isEmpty(nodes)) {
- final AffinityTopologyVersion topVer = unreservedTopVer;
-
- if (topVer != null && --unreservedNodesRetryCnt > 0) {
- cctx.affinity().affinityReadyFuture(topVer).listen(
- new IgniteInClosure<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(
- IgniteInternalFuture<AffinityTopologyVersion> future) {
-
- nodes = fallbacks(topVer);
-
- // Race is impossible here because query retries are executed one by one.
- unreservedTopVer = null;
-
- init();
- }
- });
- }
- else
- onDone(e);
- }
- else
- init();
- }
- }
- });
-
- fut = fut0;
}
/** {@inheritDoc} */
@@ -689,6 +644,13 @@ public class GridCacheQueryAdapter<T> implements CacheQuery<T> {
}
/** {@inheritDoc} */
+ @Override public Collection<R> get() throws IgniteCheckedException {
+ assert false;
+
+ return super.get();
+ }
+
+ /** {@inheritDoc} */
@Override public R next() {
if (firstItemReturned)
return fut.next();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
index a7b55eb..25ace1b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/GridCacheQueryManager.java
@@ -33,8 +33,9 @@ import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheMetricsImpl;
import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.CachePeekModes;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCacheInternal;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheOffheapSwapEntry;
@@ -819,8 +820,12 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
final GridDhtCacheAdapter dht = cctx.isLocal() ? null : (cctx.isNear() ? cctx.near().dht() : cctx.dht());
+ final GridCacheAdapter cache = dht != null ? dht : cctx.cache();
+
final ExpiryPolicy plc = cctx.expiry();
+ final AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
+
final boolean backups = qry.includeBackups() || cctx.isReplicated();
final GridCloseableIteratorAdapter<IgniteBiTuple<K, V>> heapIt =
@@ -841,8 +846,6 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
else if (part < 0 || part >= cctx.affinity().partitions())
iter = F.emptyIterator();
else {
- AffinityTopologyVersion topVer = cctx.affinity().affinityTopologyVersion();
-
locPart = dht.topology().localPartition(part, topVer, false);
// double check for owning state
@@ -899,7 +902,15 @@ public abstract class GridCacheQueryManager<K, V> extends GridCacheManagerAdapte
V val;
try {
- val = prj.localPeek(key, CachePeekModes.ONHEAP_ONLY, expiryPlc);
+ GridCacheEntryEx entry = cache.peekEx(key);
+
+ CacheObject cacheVal =
+ entry != null ? entry.peek(true, false, false, topVer, expiryPlc) : null;
+
+ val = cacheVal != null ? (V)cacheVal.value(cctx.cacheObjectContext(), false) : null;
+ }
+ catch (GridCacheEntryRemovedException e) {
+ val = null;
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/ignite/blob/f39577a2/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
index c087ffc..02b213e 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/CacheScanPartitionQueryFallbackSelfTest.java
@@ -22,17 +22,14 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.cache.Cache;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
@@ -43,14 +40,11 @@ import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.CacheQueryFuture;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryRequest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -90,15 +84,9 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/** Expected first node ID. */
private static UUID expNodeId;
- /** Expected fallback node ID. */
- private static UUID expFallbackNodeId;
-
/** Communication SPI factory. */
private CommunicationSpiFactory commSpiFactory;
- /** Latch. */
- private static CountDownLatch latch;
-
/** Test entries. */
private Map<Integer, Map<Integer, Integer>> entries = new HashMap<>();
@@ -150,7 +138,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
int part = anyLocalPartition(cache.context());
- CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+ QueryCursor<Cache.Entry<Integer, Integer>> qry =
+ cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
doTestScanQuery(qry, part);
}
@@ -180,7 +169,8 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
expNodeId = tup.get2();
- CacheQuery<Map.Entry<Integer, Integer>> qry = cache.context().queries().createScanQuery(null, part, false);
+ QueryCursor<Cache.Entry<Integer, Integer>> qry =
+ cache.query(new ScanQuery<Integer, Integer>().setPartition(part));
doTestScanQuery(qry, part);
}
@@ -190,12 +180,17 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
}
/**
- * Scan should activate fallback mechanism when new nodes join topology and rebalancing happens in parallel with
- * scan query.
- *
* @throws Exception In case of error.
*/
public void testScanFallbackOnRebalancing() throws Exception {
+ scanFallbackOnRebalancing(false);
+ }
+
+ /**
+ * @param cur If {@code true} tests query cursor.
+ * @throws Exception In case of error.
+ */
+ private void scanFallbackOnRebalancing(final boolean cur) throws Exception {
cacheMode = CacheMode.PARTITIONED;
clientMode = false;
backups = 2;
@@ -244,15 +239,21 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+ int cntr = 0;
+
while (!done.get()) {
int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
- info("Running query [node=" + nodeId + ", part=" + part + ']');
+ if (cntr++ % 100 == 0)
+ info("Running query [node=" + nodeId + ", part=" + part + ']');
- try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
+ try (QueryCursor<Cache.Entry<Integer, Integer>> cur0 =
cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
- doTestScanQueryCursor(cur, part);
+ if (cur)
+ doTestScanQueryCursor(cur0, part);
+ else
+ doTestScanQuery(cur0, part);
}
}
@@ -278,7 +279,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
*
* @throws Exception In case of error.
*/
- public void testScanFallbackOnRebalancingCursor() throws Exception {
+ public void testScanFallbackOnRebalancingCursor1() throws Exception {
cacheMode = CacheMode.PARTITIONED;
clientMode = false;
backups = 1;
@@ -315,13 +316,16 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
IgniteCache<Integer, Integer> cache = grid(nodeId).cache(null);
+ int cntr = 0;
+
while (!done.get()) {
int part = ThreadLocalRandom.current().nextInt(ignite(nodeId).affinity(null).partitions());
- info("Running query [node=" + nodeId + ", part=" + part + ']');
+ if (cntr++ % 100 == 0)
+ info("Running query [node=" + nodeId + ", part=" + part + ']');
try (QueryCursor<Cache.Entry<Integer, Integer>> cur =
- cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
+ cache.query(new ScanQuery<Integer, Integer>(part).setPageSize(5))) {
doTestScanQueryCursor(cur, part);
}
@@ -340,95 +344,15 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
}
/**
- * Scan should try first remote node and fallbacks to second remote node.
- *
* @throws Exception If failed.
*/
- public void testScanFallback() throws Exception {
- cacheMode = CacheMode.PARTITIONED;
- backups = 1;
- commSpiFactory = new TestFallbackCommunicationSpiFactory();
-
- final Set<Integer> candidates = new TreeSet<>();
-
- final AtomicBoolean test = new AtomicBoolean(false);
-
- for(int j = 0; j < 2; j++) {
- clientMode = true;
-
- latch = new CountDownLatch(1);
-
- try {
- final Ignite ignite0 = startGrid(0);
-
- clientMode = false;
-
- final IgniteEx ignite1 = startGrid(1);
- final IgniteEx ignite2 = startGrid(2);
- startGrid(3);
-
- if (test.get()) {
- expNodeId = ignite1.localNode().id();
- expFallbackNodeId = ignite2.localNode().id();
- }
-
- final IgniteCacheProxy<Integer, Integer> cache = fillCache(ignite0);
-
- if (!test.get()) {
- candidates.addAll(localPartitions(ignite1));
-
- candidates.retainAll(localPartitions(ignite2));
- }
-
- Runnable run = new Runnable() {
- @Override public void run() {
- try {
- startGrid(4);
- startGrid(5);
-
- awaitPartitionMapExchange();
-
- if (!test.get()) {
- candidates.removeAll(localPartitions(ignite1));
-
- F.retain(candidates, false, localPartitions(ignite2));
- }
-
- latch.countDown();
- }
- catch (Exception e) {
- e.printStackTrace();
- }
-
- }
- };
-
- Integer part = null;
- CacheQuery<Map.Entry<Integer, Integer>> qry = null;
-
- if (test.get()) {
- part = F.first(candidates);
-
- qry = cache.context().queries().createScanQuery(null, part, false);
- }
-
- new Thread(run).start();
-
- if (test.get())
- doTestScanQuery(qry, part);
- else
- latch.await();
- }
- finally {
- test.set(true);
-
- stopAllGrids();
- }
- }
+ public void testScanFallbackOnRebalancingCursor2() throws Exception {
+ scanFallbackOnRebalancing(true);
}
/**
* @param ignite Ignite.
+ * @return Cache.
*/
protected IgniteCacheProxy<Integer, Integer> fillCache(Ignite ignite) {
IgniteCacheProxy<Integer, Integer> cache =
@@ -452,16 +376,14 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/**
* @param qry Query.
+ * @param part Partition.
*/
- protected void doTestScanQuery(CacheQuery<Map.Entry<Integer, Integer>> qry, int part)
- throws IgniteCheckedException {
- CacheQueryFuture<Map.Entry<Integer, Integer>> fut = qry.execute();
-
- Collection<Map.Entry<Integer, Integer>> qryEntries = fut.get();
+ protected void doTestScanQuery(QueryCursor<Cache.Entry<Integer, Integer>> qry, int part) {
+ Collection<Cache.Entry<Integer, Integer>> qryEntries = qry.getAll();
Map<Integer, Integer> map = entries.get(part);
- for (Map.Entry<Integer, Integer> e : qryEntries)
+ for (Cache.Entry<Integer, Integer> e : qryEntries)
assertEquals(map.get(e.getKey()), e.getValue());
assertEquals("Invalid number of entries for partition: " + part, map.size(), qryEntries.size());
@@ -472,7 +394,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
* @param part Partition number.
*/
protected void doTestScanQueryCursor(
- QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) throws IgniteCheckedException {
+ QueryCursor<Cache.Entry<Integer, Integer>> cur, int part) {
Map<Integer, Integer> map = entries.get(part);
@@ -491,6 +413,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/**
* @param cctx Cctx.
+ * @return Local partition.
*/
private static int anyLocalPartition(GridCacheContext<?, ?> cctx) {
return F.first(cctx.topology().localPartitions()).id();
@@ -498,6 +421,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/**
* @param cctx Cctx.
+ * @return Remote partition.
*/
private IgniteBiTuple<Integer, UUID> remotePartition(final GridCacheContext cctx) {
ClusterNode node = F.first(cctx.kernalContext().grid().cluster().forRemotes().nodes());
@@ -513,6 +437,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
/**
* @param ignite Ignite.
+ * @return Local partitions.
*/
private Set<Integer> localPartitions(Ignite ignite) {
GridCacheContext cctx = ((IgniteCacheProxy)ignite.cache(null)).context();
@@ -536,7 +461,7 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
*/
private interface CommunicationSpiFactory {
/**
- * Creates communication SPI instance.
+ * @return Communication SPI instance.
*/
TcpCommunicationSpi create();
}
@@ -549,13 +474,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
@Override public TcpCommunicationSpi create() {
return new TcpCommunicationSpi() {
@Override public void sendMessage(ClusterNode node, Message msg,
- IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
Object origMsg = ((GridIoMessage)msg).message();
if (origMsg instanceof GridCacheQueryRequest)
fail(); //should use local node
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
};
}
@@ -569,44 +494,13 @@ public class CacheScanPartitionQueryFallbackSelfTest extends GridCommonAbstractT
@Override public TcpCommunicationSpi create() {
return new TcpCommunicationSpi() {
@Override public void sendMessage(ClusterNode node, Message msg,
- IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
+ IgniteInClosure<IgniteException> ackC) throws IgniteSpiException {
Object origMsg = ((GridIoMessage)msg).message();
if (origMsg instanceof GridCacheQueryRequest)
assertEquals(expNodeId, node.id());
- super.sendMessage(node, msg, ackClosure);
- }
- };
- }
- }
-
- /**
- *
- */
- private static class TestFallbackCommunicationSpiFactory implements CommunicationSpiFactory {
- /** {@inheritDoc} */
- @Override public TcpCommunicationSpi create() {
- return new TcpCommunicationSpi() {
- @Override public void sendMessage(ClusterNode node, Message msg,
- IgniteInClosure<IgniteException> ackClosure) throws IgniteSpiException {
- Object origMsg = ((GridIoMessage)msg).message();
-
- if (origMsg instanceof GridCacheQueryRequest) {
- if (latch.getCount() > 0)
- assertEquals(expNodeId, node.id());
- else
- assertEquals(expFallbackNodeId, node.id());
-
- try {
- latch.await();
- }
- catch (InterruptedException e) {
- throw new IgniteSpiException(e);
- }
- }
-
- super.sendMessage(node, msg, ackClosure);
+ super.sendMessage(node, msg, ackC);
}
};
}