You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/04/24 08:22:10 UTC
[52/65] [abbrv] ignite git commit: Added test.
Added test.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a900dc68
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a900dc68
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a900dc68
Branch: refs/heads/ignite-5024
Commit: a900dc685a24b21ac155796ecb027c60608b88f5
Parents: ca8ad03
Author: sboikov <sb...@gridgain.com>
Authored: Fri Apr 21 17:30:23 2017 +0300
Committer: sboikov <sb...@gridgain.com>
Committed: Fri Apr 21 17:30:23 2017 +0300
----------------------------------------------------------------------
.../internal/TestRecordingCommunicationSpi.java | 29 ++-
.../cache/IgniteOnePhaseCommitInvokeTest.java | 10 +-
.../CacheLateAffinityAssignmentTest.java | 31 ++-
...heClientMultiNodeUpdateTopologyLockTest.java | 193 +++++++++++++++++++
.../IgniteCacheReadFromBackupTest.java | 15 +-
.../IgniteTxCachePrimarySyncTest.java | 17 +-
.../dht/IgniteCacheTxRecoveryRollbackTest.java | 17 +-
.../atomic/IgniteCacheAtomicProtocolTest.java | 183 ++----------------
.../ignite/testframework/GridTestNode.java | 7 +
.../junits/common/GridCommonAbstractTest.java | 76 +++++++-
10 files changed, 353 insertions(+), 225 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index aa0cc09..98d2553 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -32,7 +32,6 @@ import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteInClosure;
-import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.IgniteSpiException;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
@@ -59,7 +58,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
private Map<Class<?>, Set<String>> blockCls = new HashMap<>();
/** */
- private IgnitePredicate<GridIoMessage> blockP;
+ private IgniteBiPredicate<ClusterNode, Message> blockP;
/**
* @param node Node.
@@ -75,16 +74,18 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
if (msg instanceof GridIoMessage) {
GridIoMessage ioMsg = (GridIoMessage)msg;
- Object msg0 = ioMsg.message();
+ Message msg0 = ioMsg.message();
synchronized (this) {
- if ((recordClasses != null && recordClasses.contains(msg0.getClass())) ||
- (recordP != null && recordP.apply(node, msg)))
+ boolean record = (recordClasses != null && recordClasses.contains(msg0.getClass())) ||
+ (recordP != null && recordP.apply(node, msg0));
+
+ if (record)
recordedMsgs.add(msg0);
boolean block = false;
- if (blockP != null && blockP.apply(ioMsg))
+ if (blockP != null && blockP.apply(node, msg0))
block = true;
else {
Set<String> blockNodes = blockCls.get(msg0.getClass());
@@ -106,6 +107,8 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
return;
}
+ else if (record)
+ notifyAll();
}
}
@@ -166,7 +169,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
* @param nodeName Node name.
* @throws InterruptedException If interrupted.
*/
- public void waitForMessage(Class<?> cls, String nodeName) throws InterruptedException {
+ public void waitForBlocked(Class<?> cls, String nodeName) throws InterruptedException {
synchronized (this) {
while (!hasMessage(cls, nodeName))
wait();
@@ -174,6 +177,16 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
}
/**
+ * @throws InterruptedException If interrupted.
+ */
+ public void waitForRecorded() throws InterruptedException {
+ synchronized (this) {
+ while (recordedMsgs.isEmpty())
+ wait();
+ }
+ }
+
+ /**
* @param cls Message class.
* @param nodeName Node name.
* @return {@code True} if has blocked message.
@@ -191,7 +204,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi {
/**
* @param blockP Message block predicate.
*/
- public void blockMessages(IgnitePredicate<GridIoMessage> blockP) {
+ public void blockMessages(IgniteBiPredicate<ClusterNode, Message> blockP) {
synchronized (this) {
this.blockP = blockP;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
index 601c067..a5cb3f2 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteOnePhaseCommitInvokeTest.java
@@ -21,17 +21,17 @@ import java.util.concurrent.Callable;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -127,10 +127,8 @@ public class IgniteOnePhaseCommitInvokeTest extends GridCommonAbstractTest {
final Ignite clientNode = startGrid(1);
- TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg0) {
- Message msg = msg0.message();
-
+ TestRecordingCommunicationSpi.spi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
return msg instanceof GridDhtPartitionSupplyMessage &&
((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(CACHE_NAME);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index a74117c..c68c8d0 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -75,6 +75,7 @@ import org.apache.ignite.internal.util.typedef.PA;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
@@ -1082,12 +1083,10 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite(i).configuration().getCommunicationSpi();
- spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- Message msg0 = msg.message();
-
- return msg0.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
- msg0.getClass().equals(GridDhtPartitionsFullMessage.class);
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg.getClass().equals(GridDhtPartitionsSingleMessage.class) ||
+ msg.getClass().equals(GridDhtPartitionsFullMessage.class);
}
});
}
@@ -1710,14 +1709,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
@Override public TestRecordingCommunicationSpi apply(String s) {
TestRecordingCommunicationSpi spi = new TestRecordingCommunicationSpi();
- spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- Message msg0 = msg.message();
-
- if (msg0 instanceof GridDhtForceKeysRequest || msg0 instanceof GridDhtForceKeysResponse) {
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (msg instanceof GridDhtForceKeysRequest || msg instanceof GridDhtForceKeysResponse) {
fail.set(true);
- U.dumpStack(log, "Unexpected message: " + msg0);
+ U.dumpStack(log, "Unexpected message: " + msg);
}
return false;
@@ -2011,14 +2008,12 @@ public class CacheLateAffinityAssignmentTest extends GridCommonAbstractTest {
* @param cacheName Cache name.
*/
private void blockSupplySend(TestRecordingCommunicationSpi spi, final String cacheName) {
- spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage ioMsg) {
- if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
return false;
- GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
-
- return msg.cacheId() == CU.cacheId(cacheName);
+ return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(cacheName);
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
new file mode 100644
index 0000000..4adf5f4
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheClientMultiNodeUpdateTopologyLockTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed;
+
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsSingleMessage;
+import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC;
+import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ;
+
+/**
+ *
+ */
+public class IgniteCacheClientMultiNodeUpdateTopologyLockTest extends GridCommonAbstractTest {
+ /** */
+ private static final TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private static final String TEST_CACHE = "testCache";
+
+ /** */
+ private boolean client;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setConsistentId(gridName);
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
+
+ cfg.setCommunicationSpi(commSpi);
+
+ cfg.setClientMode(client);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testPessimisticTx() throws Exception {
+ startGrids(3);
+
+ client = true;
+
+ Ignite clientNode = startGrid(3);
+
+ client = false;
+
+ IgniteCache<Integer, Integer> cache = clientNode.createCache(cacheConfiguration(0, FULL_SYNC));
+
+ awaitPartitionMapExchange();
+
+ Integer key1 = movingKeysAfterJoin(ignite(1), TEST_CACHE, 1).get(0);
+ Integer key2 = movingKeysAfterJoin(ignite(2), TEST_CACHE, 1).get(0);
+
+ log.info("Start tx [key1=" + key1 + ", key2=" + key2 + ']');
+
+ IgniteInternalFuture<?> startFut;
+
+ TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(ignite(2));
+
+ TestRecordingCommunicationSpi clientSpi = TestRecordingCommunicationSpi.spi(clientNode);
+
+ final UUID node0Id = ignite(0).cluster().localNode().id();
+ final UUID node2Id = ignite(2).cluster().localNode().id();
+
+ spi2.record(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (!node0Id.equals(node.id()))
+ return false;
+
+ return (msg instanceof GridDhtPartitionsSingleMessage) &&
+ ((GridDhtPartitionsSingleMessage)msg).exchangeId() != null;
+ }
+ });
+
+ clientSpi.record(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (!node2Id.equals(node.id()))
+ return false;
+
+ if (msg instanceof GridNearTxFinishRequest) {
+ log.info("Delay message [msg=" + msg + ']');
+
+ try {
+ Thread.sleep(5000);
+ }
+ catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ log.info("Send delayed message [msg=" + msg + ']');
+ }
+
+ return false;
+ }
+ });
+
+ try (Transaction tx = clientNode.transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ cache.put(key1, 1);
+
+ startFut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ startGrid(4);
+
+ return null;
+ }
+ }, "start-thread");
+
+ spi2.waitForRecorded();
+
+ U.sleep(5);
+
+ cache.put(key2, 2);
+
+ log.info("Commit tx");
+
+ tx.commit();
+ }
+
+ assertEquals((Integer)1, cache.get(key1));
+ assertEquals((Integer)2, cache.get(key2));
+
+ startFut.get();
+
+ assertEquals((Integer)1, cache.get(key1));
+ assertEquals((Integer)2, cache.get(key2));
+
+ awaitPartitionMapExchange();
+
+ assertEquals((Integer)1, cache.get(key1));
+ assertEquals((Integer)2, cache.get(key2));
+ }
+
+ /**
+ * @param backups Number of backups.
+ * @param writeSync Cache write synchronization mode.
+ * @return Cache configuration.
+ */
+ private CacheConfiguration<Integer, Integer> cacheConfiguration(int backups,
+ CacheWriteSynchronizationMode writeSync) {
+ CacheConfiguration<Integer, Integer> ccfg = new CacheConfiguration<>();
+
+ ccfg.setName(TEST_CACHE);
+ ccfg.setAtomicityMode(TRANSACTIONAL);
+ ccfg.setWriteSynchronizationMode(writeSync);
+ ccfg.setBackups(backups);
+ ccfg.setRebalanceMode(ASYNC);
+
+ return ccfg;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
index 29c2af6..42de613 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCacheReadFromBackupTest.java
@@ -33,16 +33,17 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -195,14 +196,12 @@ public class IgniteCacheReadFromBackupTest extends GridCommonAbstractTest {
TestRecordingCommunicationSpi spi =
(TestRecordingCommunicationSpi)ignite.configuration().getCommunicationSpi();
- spi.blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage ioMsg) {
- if (!ioMsg.message().getClass().equals(GridDhtPartitionSupplyMessage.class))
+ spi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ if (!msg.getClass().equals(GridDhtPartitionSupplyMessage.class))
return false;
- GridDhtPartitionSupplyMessage msg = (GridDhtPartitionSupplyMessage)ioMsg.message();
-
- return msg.cacheId() == CU.cacheId(ccfg.getName());
+ return ((GridDhtPartitionSupplyMessage)msg).cacheId() == CU.cacheId(ccfg.getName());
}
});
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
index 8a1d4a7..91e6cf6 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteTxCachePrimarySyncTest.java
@@ -29,7 +29,6 @@ import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
-import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteTransactions;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
@@ -41,7 +40,6 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
@@ -50,10 +48,11 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPr
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
-import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiInClosure;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -213,9 +212,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest {
commSpi0.record(GridDhtTxFinishRequest.class);
- commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
- @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
- return e.message() instanceof GridDhtTxFinishRequest;
+ commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtTxFinishRequest;
}
});
@@ -466,9 +465,9 @@ public class IgniteTxCachePrimarySyncTest extends GridCommonAbstractTest {
commSpi0.record(GridDhtTxFinishRequest.class);
- commSpi0.blockMessages(new IgnitePredicateX<GridIoMessage>() {
- @Override public boolean applyx(GridIoMessage e) throws IgniteCheckedException {
- return e.message() instanceof GridDhtTxFinishRequest;
+ commSpi0.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtTxFinishRequest;
}
});
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
index cfe9029..7e7d341 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCacheTxRecoveryRollbackTest.java
@@ -32,13 +32,13 @@ import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.cache.store.CacheStoreAdapter;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxPrepareResponse;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
@@ -46,7 +46,8 @@ import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteBiPredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -189,7 +190,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
assertFalse(fut.isDone());
- testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+ testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name());
stopGrid(client2.name());
@@ -264,9 +265,9 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
testSpi(client2).blockMessages(GridNearTxFinishRequest.class, srv0.name());
- testSpi(srv0).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- return msg.message() instanceof GridDhtTxFinishRequest;
+ testSpi(srv0).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtTxFinishRequest;
}
});
@@ -292,7 +293,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
assertFalse(fut.isDone());
- testSpi(client2).waitForMessage(GridNearTxFinishRequest.class, srv0.name());
+ testSpi(client2).waitForBlocked(GridNearTxFinishRequest.class, srv0.name());
stopGrid(client2.name());
stopGrid(srv0.name());
@@ -397,7 +398,7 @@ public class IgniteCacheTxRecoveryRollbackTest extends GridCommonAbstractTest {
assertFalse(fut.isDone());
- testSpi(srv0).waitForMessage(GridNearTxPrepareResponse.class, client.name());
+ testSpi(srv0).waitForBlocked(GridNearTxPrepareResponse.class, client.name());
stopGrid(client.name());
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
index 5a6b1c8..591858a 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicProtocolTest.java
@@ -17,14 +17,11 @@
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import java.util.concurrent.TimeUnit;
import javax.cache.processor.MutableEntry;
import org.apache.ignite.Ignite;
@@ -32,18 +29,11 @@ import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheEntryProcessor;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.affinity.Affinity;
-import org.apache.ignite.cache.affinity.AffinityFunction;
-import org.apache.ignite.cache.affinity.AffinityFunctionContext;
-import org.apache.ignite.cluster.ClusterMetrics;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
-import org.apache.ignite.internal.managers.communication.GridIoMessage;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
-import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplyMessage;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
@@ -51,15 +41,14 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteFuture;
-import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
-import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
import static org.apache.ignite.cache.CacheRebalanceMode.ASYNC;
@@ -112,12 +101,10 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
*/
private void blockRebalance() {
for (Ignite node : G.allGrids()) {
- testSpi(node).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- Object msg0 = msg.message();
-
- return (msg0 instanceof GridDhtPartitionSupplyMessage)
- && ((GridCacheMessage)msg0).cacheId() == CU.cacheId(TEST_CACHE);
+ testSpi(node).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return (msg instanceof GridDhtPartitionSupplyMessage)
+ && ((GridCacheMessage)msg).cacheId() == CU.cacheId(TEST_CACHE);
}
});
}
@@ -368,7 +355,7 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
final IgniteCache<Integer, Integer> nearCache = clientNode.createCache(cacheConfiguration(1, FULL_ASYNC));
- List<Integer> keys = getKeysMoved(srv0, TEST_CACHE, putAll ? 3 : 1);
+ List<Integer> keys = movingKeysAfterJoin(srv0, TEST_CACHE, putAll ? 10 : 1);
testSpi(clientNode).blockMessages(GridNearAtomicSingleUpdateRequest.class, srv0.name());
testSpi(clientNode).blockMessages(GridNearAtomicFullUpdateRequest.class, srv0.name());
@@ -663,9 +650,9 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
- testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtAtomicAbstractUpdateRequest;
}
});
@@ -719,16 +706,16 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
IgniteCache<Integer, Integer> nearCache = client.cache(TEST_CACHE);
if (fail0) {
- testSpi(ignite(0)).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ testSpi(ignite(0)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtAtomicAbstractUpdateRequest;
}
});
}
if (fail1) {
- testSpi(ignite(2)).blockMessages(new IgnitePredicate<GridIoMessage>() {
- @Override public boolean apply(GridIoMessage msg) {
- return msg.message() instanceof GridDhtAtomicAbstractUpdateRequest;
+ testSpi(ignite(2)).blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+ @Override public boolean apply(ClusterNode node, Message msg) {
+ return msg instanceof GridDhtAtomicAbstractUpdateRequest;
}
});
}
@@ -825,68 +812,6 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
}
/**
- * Return list of keys that are primary for given node on given topology,
- * but will not be primary after add one new node.
- *
- * @param ign Ignite.
- * @param cacheName Cache name.
- * @param size Number of keys.
- * @return List of keys.
- */
- private List<Integer> getKeysMoved(Ignite ign, String cacheName, int size) {
- GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
-
- ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
-
- AffinityFunction func = cctx.config().getAffinity();
-
- AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
- nodes,
- null,
- null,
- new AffinityTopologyVersion(1, 0),
- cctx.config().getBackups());
-
- List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
-
- String name = getTestIgniteInstanceName(nodes.size());
-
- nodes.add(new FakeNode(name));
-
- ctx = new GridAffinityFunctionContextImpl(
- nodes,
- null,
- null,
- new AffinityTopologyVersion(1, 0),
- cctx.config().getBackups());
-
- List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
-
- Set<Integer> movedParts = new HashSet<>();
-
- UUID localId = ign.cluster().localNode().id();
-
- for (int i = 0; i < calcAff.size(); i++) {
- if (calcAff.get(i).get(0).id().equals(localId) && !calcAff2.get(i).get(0).id().equals(localId))
- movedParts.add(i);
- }
-
- List<Integer> keys = new ArrayList<>();
-
- for (int i = 0; i < 10000; i++) {
- int keyPart = func.partition(ign.affinity(cacheName).affinityKey(i));
-
- if (movedParts.contains(keyPart))
- keys.add(i);
-
- if (keys.size() == size)
- break;
- }
-
- return keys;
- }
-
- /**
*
*/
public static class SetValueEntryProcessor implements CacheEntryProcessor<Integer, Integer, Object> {
@@ -908,80 +833,4 @@ public class IgniteCacheAtomicProtocolTest extends GridCommonAbstractTest {
return null;
}
}
-
- /**
- *
- */
- public static class FakeNode implements ClusterNode {
- /** */
- private final String consistendId;
- /** */
- private final UUID uuid;
-
- /** */
- public FakeNode(String consistendId) {
- this.consistendId = consistendId;
- uuid = UUID.randomUUID();
- }
-
- /** {@inheritDoc} */
- @Override public UUID id() {
- return uuid;
- }
-
- /** {@inheritDoc} */
- @Override public Object consistentId() {
- return consistendId;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public <T> T attribute(String name) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public ClusterMetrics metrics() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Map<String, Object> attributes() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<String> addresses() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public Collection<String> hostNames() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public long order() {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public IgniteProductVersion version() {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isLocal() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isDaemon() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public boolean isClient() {
- return false;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
index d331387..cefb774 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestNode.java
@@ -103,6 +103,13 @@ public class GridTestNode extends GridMetadataAwareAdapter implements ClusterNod
return id;
}
+ /**
+ * @param consistentId Consistent ID.
+ */
+ public void consistentId(Object consistentId) {
+ this.consistentId = consistentId;
+ }
+
/** {@inheritDoc} */
@Override public Object consistentId() {
return consistentId;
http://git-wip-us.apache.org/repos/asf/ignite/blob/a900dc68/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index c76c83e..e6b30e0 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -46,10 +46,10 @@ import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteMessaging;
import org.apache.ignite.Ignition;
-import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CachePeekMode;
import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cluster.ClusterGroup;
import org.apache.ignite.cluster.ClusterNode;
@@ -64,7 +64,9 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteKernal;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheExplicitLockSpan;
import org.apache.ignite.internal.processors.cache.GridCacheFuture;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
@@ -91,6 +93,7 @@ import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.testframework.GridTestNode;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.GridAbstractTest;
import org.apache.ignite.transactions.Transaction;
@@ -1117,6 +1120,77 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest {
}
/**
+ * Return list of keys that are primary for given node on current topology,
+ * but primary node will change after new node will be added.
+ *
+ * @param ign Ignite.
+ * @param cacheName Cache name.
+ * @param size Number of keys.
+ * @return List of keys.
+ */
+ protected final List<Integer> movingKeysAfterJoin(Ignite ign, String cacheName, int size) {
+ assertEquals("Expected consistentId is set to node name", ign.name(), ign.cluster().localNode().consistentId());
+
+ GridCacheContext<Object, Object> cctx = ((IgniteKernal)ign).context().cache().internalCache(cacheName).context();
+
+ ArrayList<ClusterNode> nodes = new ArrayList<>(ign.cluster().nodes());
+
+ AffinityFunction func = cctx.config().getAffinity();
+
+ AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(
+ nodes,
+ null,
+ null,
+ AffinityTopologyVersion.NONE,
+ cctx.config().getBackups());
+
+ List<List<ClusterNode>> calcAff = func.assignPartitions(ctx);
+
+ GridTestNode fakeNode = new GridTestNode(UUID.randomUUID(), null);
+
+ fakeNode.consistentId(getTestIgniteInstanceName(nodes.size()));
+
+ nodes.add(fakeNode);
+
+ ctx = new GridAffinityFunctionContextImpl(
+ nodes,
+ null,
+ null,
+ AffinityTopologyVersion.NONE,
+ cctx.config().getBackups());
+
+ List<List<ClusterNode>> calcAff2 = func.assignPartitions(ctx);
+
+ Set<Integer> movedParts = new HashSet<>();
+
+ UUID locId = ign.cluster().localNode().id();
+
+ for (int i = 0; i < calcAff.size(); i++) {
+ if (calcAff.get(i).get(0).id().equals(locId) && !calcAff2.get(i).get(0).id().equals(locId))
+ movedParts.add(i);
+ }
+
+ List<Integer> keys = new ArrayList<>();
+
+ Affinity<Integer> aff = ign.affinity(cacheName);
+
+ for (int i = 0; i < 10_000; i++) {
+ int keyPart = aff.partition(i);
+
+ if (movedParts.contains(keyPart)) {
+ keys.add(i);
+
+ if (keys.size() == size)
+ break;
+ }
+ }
+
+ assertEquals("Failed to find moving keys [movedPats=" + movedParts + ", keys=" + keys + ']', size, keys.size());
+
+ return keys;
+ }
+
+ /**
* @param cache Cache.
* @return Collection of keys for which given cache is primary.
* @throws IgniteCheckedException If failed.