You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dg...@apache.org on 2018/11/20 12:15:26 UTC
ignite git commit: IGNITE-10341 Added loss policy tests with
persistence - Fixes #5444.
Repository: ignite
Updated Branches:
refs/heads/master a9b5c8c8a -> eb8888561
IGNITE-10341 Added loss policy tests with persistence - Fixes #5444.
Signed-off-by: Dmitriy Govorukhin <dm...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/eb888856
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/eb888856
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/eb888856
Branch: refs/heads/master
Commit: eb8888561561f92343e0620082b81c757153b4be
Parents: a9b5c8c
Author: Dmitriy Govorukhin <dm...@gmail.com>
Authored: Tue Nov 20 15:15:16 2018 +0300
Committer: Dmitriy Govorukhin <dm...@gmail.com>
Committed: Tue Nov 20 15:15:16 2018 +0300
----------------------------------------------------------------------
.../IgniteCachePartitionLossPolicySelfTest.java | 592 +++++++++++++------
1 file changed, 421 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/eb888856/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
index f02563d..cfe578d 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/IgniteCachePartitionLossPolicySelfTest.java
@@ -19,10 +19,11 @@ package org.apache.ignite.internal.processors.cache.distributed;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
@@ -36,6 +37,8 @@ import org.apache.ignite.cache.affinity.Affinity;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cache.query.ScanQuery;
import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.CacheRebalancingEvent;
import org.apache.ignite.events.Event;
@@ -43,22 +46,25 @@ import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestDelayingCommunicationSpi;
import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsFullMessage;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
+import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
-import static org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
/**
*
@@ -77,12 +83,14 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
private int backups;
/** */
- private final AtomicBoolean delayPartExchange = new AtomicBoolean();
+ private final AtomicBoolean delayPartExchange = new AtomicBoolean(false);
/** */
private final TopologyChanger killSingleNode = new TopologyChanger(
- false, asList(3), asList(0, 1, 2, 4), 0
- );
+ false, singletonList(3), asList(0, 1, 2, 4), 0);
+
+ /** */
+ private boolean isPersistenceEnabled;
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
@@ -101,9 +109,16 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
cfg.setClientMode(client);
+ cfg.setCacheConfiguration(cacheConfiguration());
+
cfg.setConsistentId(gridName);
- cfg.setCacheConfiguration(cacheConfiguration());
+ cfg.setDataStorageConfiguration(
+ new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(
+ new DataRegionConfiguration()
+ .setPersistenceEnabled(isPersistenceEnabled)
+ ));
return cfg;
}
@@ -124,19 +139,25 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
}
/** {@inheritDoc} */
- @Override protected void afterTest() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- cleanPersistenceDir();
-
delayPartExchange.set(false);
+ partLossPlc = PartitionLossPolicy.IGNORE;
+
backups = 0;
+
+ isPersistenceEnabled = false;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
}
/**
@@ -151,6 +172,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadOnlySafeWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_ONLY_SAFE;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(false, true, killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadOnlyAll() throws Exception {
partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
@@ -160,6 +192,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadOnlyAllWithPersistence() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10041");
+
+ partLossPlc = PartitionLossPolicy.READ_ONLY_ALL;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(false, false, killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafe() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -169,6 +214,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteAll() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
@@ -178,6 +234,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteAllWithPersistence() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10041");
+
+ partLossPlc = PartitionLossPolicy.READ_WRITE_ALL;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, false, killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeAfterKillTwoNodes() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -187,6 +256,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeAfterKillTwoNodesWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeAfterKillTwoNodesWithDelay() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -196,6 +276,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeAfterKillTwoNodesWithDelayWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(false, asList(3, 2), asList(0, 1, 4), 20));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeWithBackupsAfterKillThreeNodes() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -207,6 +298,21 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeWithBackupsAfterKillThreeNodesWithPersistence() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10043");
+
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ backups = 1;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2, 1), asList(0, 4), 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeAfterKillCrd() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -216,6 +322,17 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeAfterKillCrdWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeWithBackups() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -227,6 +344,19 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
/**
* @throws Exception if failed.
*/
+ public void testReadWriteSafeWithBackupsWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ backups = 1;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 2), asList(0, 1, 4), 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
public void testReadWriteSafeWithBackupsAfterKillCrd() throws Exception {
partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
@@ -236,12 +366,81 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
}
/**
- * @param topChanger topology changer.
* @throws Exception if failed.
*/
- public void testIgnore(TopologyChanger topChanger) throws Exception {
+ public void testReadWriteSafeWithBackupsAfterKillCrdWithPersistence() throws Exception {
+ partLossPlc = PartitionLossPolicy.READ_WRITE_SAFE;
+
+ backups = 1;
+
+ isPersistenceEnabled = true;
+
+ checkLostPartition(true, true, new TopologyChanger(true, asList(3, 0), asList(1, 2, 4), 0));
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testIgnore() throws Exception {
fail("https://issues.apache.org/jira/browse/IGNITE-5078");
+ partLossPlc = PartitionLossPolicy.IGNORE;
+
+ checkIgnore(killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testIgnoreWithPersistence() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-5078");
+
+ fail("https://issues.apache.org/jira/browse/IGNITE-10041");
+
+ partLossPlc = PartitionLossPolicy.IGNORE;
+
+ isPersistenceEnabled = true;
+
+ checkIgnore(killSingleNode);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testIgnoreKillThreeNodes() throws Exception {
+ partLossPlc = PartitionLossPolicy.IGNORE;
+
+ // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
+ // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
+ // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
+ TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, asList(1, 2, 3), singletonList(0), 0);
+
+ checkIgnore(onlyCrdIsAlive);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ public void testIgnoreKillThreeNodesWithPersistence() throws Exception {
+ fail("https://issues.apache.org/jira/browse/IGNITE-10041");
+
+ partLossPlc = PartitionLossPolicy.IGNORE;
+
+ isPersistenceEnabled = true;
+
+ // TODO aliveNodes should include node 4, but it fails due to https://issues.apache.org/jira/browse/IGNITE-5078.
+ // TODO need to add 4 to the aliveNodes after IGNITE-5078 is fixed.
+ // TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, Arrays.asList(1, 2, 3), Arrays.asList(0, 4), 0);
+ TopologyChanger onlyCrdIsAlive = new TopologyChanger(false, asList(1, 2, 3), singletonList(0), 0);
+
+ checkIgnore(onlyCrdIsAlive);
+ }
+
+ /**
+ * @param topChanger topology changer.
+ * @throws Exception if failed.
+ */
+ private void checkIgnore(TopologyChanger topChanger) throws Exception {
topChanger.changeTopology();
for (Ignite ig : G.allGrids()) {
@@ -270,42 +469,80 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
private void checkLostPartition(boolean canWrite, boolean safe, TopologyChanger topChanger) throws Exception {
assert partLossPlc != null;
- int part = topChanger.changeTopology().get(0);
+ List<Integer> lostParts = topChanger.changeTopology();
+
+ // Wait for all grids (servers and client) have same topology version
+ // to make sure that all nodes received map with lost partition.
+ boolean success = GridTestUtils.waitForCondition(() -> {
+ AffinityTopologyVersion last = null;
+ for (Ignite ig : G.allGrids()) {
+ AffinityTopologyVersion ver = ((IgniteEx)ig).context().cache().context().exchange().readyAffinityVersion();
+
+ if (last != null && !last.equals(ver))
+ return false;
+
+ last = ver;
+ }
+
+ return true;
+ }, 10000);
+
+ assertTrue("Failed to wait for new topology", success);
for (Ignite ig : G.allGrids()) {
info("Checking node: " + ig.cluster().localNode().id());
IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
- verifyCacheOps(canWrite, safe, part, ig);
+ verifyLostPartitions(ig, lostParts);
- // Check we can read and write to lost partition in recovery mode.
- IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
+ verifyCacheOps(canWrite, safe, ig);
- for (int lostPart : recoverCache.lostPartitions()) {
- recoverCache.get(lostPart);
- recoverCache.put(lostPart, lostPart);
- }
+ validateQuery(safe, ig);
- // Check that writing in recover mode does not clear partition state.
- verifyCacheOps(canWrite, safe, part, ig);
+ // TODO withPartitionRecover doesn't work with BLT - https://issues.apache.org/jira/browse/IGNITE-10041.
+ if (!isPersistenceEnabled) {
+ // Check we can read and write to lost partition in recovery mode.
+ IgniteCache<Integer, Integer> recoverCache = cache.withPartitionRecover();
- // Validate queries.
- validateQuery(safe, ig);
+ for (int lostPart : recoverCache.lostPartitions()) {
+ recoverCache.get(lostPart);
+ recoverCache.put(lostPart, lostPart);
+ }
+
+ // Check that writing in recover mode does not clear partition state.
+ verifyLostPartitions(ig, lostParts);
+
+ verifyCacheOps(canWrite, safe, ig);
+
+ validateQuery(safe, ig);
+ }
}
- checkNewNode(true, canWrite, safe, part);
- checkNewNode(false, canWrite, safe, part);
+ checkNewNode(true, canWrite, safe);
+ checkNewNode(false, canWrite, safe);
+
+ // Bring all nodes back.
+ for (int i : topChanger.killNodes) {
+ IgniteEx grd = startGrid(i);
- // Check that partition state does not change after we start a new node.
- IgniteEx grd = startGrid(3);
+ info("Newly started node: " + grd.cluster().localNode().id());
- info("Newly started node: " + grd.cluster().localNode().id());
+ // Check that partition state does not change after we start each node.
+ // TODO With persistence enabled LOST partitions become OWNING after a node joins back - https://issues.apache.org/jira/browse/IGNITE-10044.
+ if (!isPersistenceEnabled) {
+ for (Ignite ig : G.allGrids()) {
+ verifyCacheOps(canWrite, safe, ig);
- for (Ignite ig : G.allGrids())
- verifyCacheOps(canWrite, safe, part, ig);
+ // TODO Query effectively waits for rebalance due to https://issues.apache.org/jira/browse/IGNITE-10057
+ // TODO and after resetLostPartition there is another OWNING copy in the cluster due to https://issues.apache.org/jira/browse/IGNITE-10058.
+ // TODO Uncomment after https://issues.apache.org/jira/browse/IGNITE-10058 is fixed.
+// validateQuery(safe, ig);
+ }
+ }
+ }
- ignite(4).resetLostPartitions(Collections.singletonList(DEFAULT_CACHE_NAME));
+ ignite(4).resetLostPartitions(singletonList(DEFAULT_CACHE_NAME));
awaitPartitionMapExchange(true, true, null);
@@ -321,6 +558,16 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
cache.put(i, i);
}
+
+ for (int i = 0; i < parts; i++) {
+ checkQueryPasses(ig, false, i);
+
+ if (shouldExecuteLocalQuery(ig, i))
+ checkQueryPasses(ig, true, i);
+
+ }
+
+ checkQueryPasses(ig, false);
}
}
@@ -328,25 +575,23 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
* @param client Client flag.
* @param canWrite Can write flag.
* @param safe Safe flag.
- * @param part List of lost partitions.
* @throws Exception If failed to start a new node.
*/
private void checkNewNode(
boolean client,
boolean canWrite,
- boolean safe,
- int part
+ boolean safe
) throws Exception {
this.client = client;
try {
- IgniteEx cl = startGrid("newNode");
+ IgniteEx cl = (IgniteEx)startGrid("newNode");
- CacheGroupContext grpCtx = cl.context().cache().cacheGroup(cacheId(DEFAULT_CACHE_NAME));
+ CacheGroupContext grpCtx = cl.context().cache().cacheGroup(CU.cacheId(DEFAULT_CACHE_NAME));
assertTrue(grpCtx.needsRecovery());
- verifyCacheOps(canWrite, safe, part, cl);
+ verifyCacheOps(canWrite, safe, cl);
validateQuery(safe, cl);
}
@@ -358,20 +603,26 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
}
/**
- *
+ * @param node Node.
+ * @param lostParts Lost partition IDs.
+ */
+ private void verifyLostPartitions(Ignite node, List<Integer> lostParts) {
+ IgniteCache<Integer, Integer> cache = node.cache(DEFAULT_CACHE_NAME);
+
+ Set<Integer> actualSortedLostParts = new TreeSet<>(cache.lostPartitions());
+ Set<Integer> expSortedLostParts = new TreeSet<>(lostParts);
+
+ assertEqualsCollections(expSortedLostParts, actualSortedLostParts);
+ }
+
+ /**
* @param canWrite {@code True} if writes are allowed.
* @param safe {@code True} if lost partition should trigger exception.
- * @param part Lost partition ID.
* @param ig Ignite instance.
*/
- private void verifyCacheOps(boolean canWrite, boolean safe, int part, Ignite ig) {
+ private void verifyCacheOps(boolean canWrite, boolean safe, Ignite ig) {
IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
- Collection<Integer> lost = cache.lostPartitions();
-
- assertTrue("Failed to find expected lost partition [exp=" + part + ", lost=" + lost + ']',
- lost.contains(part));
-
int parts = ig.affinity(DEFAULT_CACHE_NAME).partitions();
// Check read.
@@ -419,7 +670,7 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
* @param nodes List of nodes to find partition.
* @return List of partitions that aren't primary or backup for specified nodes.
*/
- protected List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
+ private List<Integer> noPrimaryOrBackupPartition(List<Integer> nodes) {
Affinity<Object> aff = ignite(4).affinity(DEFAULT_CACHE_NAME);
List<Integer> parts = new ArrayList<>();
@@ -444,127 +695,6 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
return parts;
}
- /** */
- private class TopologyChanger {
- /** Flag to delay partition exchange */
- private boolean delayExchange;
-
- /** List of nodes to kill */
- private List<Integer> killNodes;
-
- /** List of nodes to be alive */
- private List<Integer> aliveNodes;
-
- /** Delay between node stops */
- private long stopDelay;
-
- /**
- * @param delayExchange Flag for delay partition exchange.
- * @param killNodes List of nodes to kill.
- * @param aliveNodes List of nodes to be alive.
- * @param stopDelay Delay between stopping nodes.
- */
- public TopologyChanger(
- boolean delayExchange,
- List<Integer> killNodes,
- List<Integer> aliveNodes,
- long stopDelay
- ) {
- this.delayExchange = delayExchange;
- this.killNodes = killNodes;
- this.aliveNodes = aliveNodes;
- this.stopDelay = stopDelay;
- }
-
- /**
- * @return Lost partition ID.
- * @throws Exception If failed.
- */
- protected List<Integer> changeTopology() throws Exception {
- startGrids(4);
-
- Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
-
- for (int i = 0; i < aff.partitions(); i++)
- ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i);
-
- client = true;
-
- startGrid(4);
-
- client = false;
-
- for (int i = 0; i < 5; i++)
- info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
-
- awaitPartitionMapExchange();
-
- final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
-
- if (parts.isEmpty())
- throw new IllegalStateException("No partition on nodes: " + killNodes);
-
- final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
-
- for (int i : aliveNodes) {
- HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
-
- for (Integer part : parts)
- semaphoreMap.put(part, new Semaphore(0));
-
- lostMap.add(semaphoreMap);
-
- grid(i).events().localListen(new P1<Event>() {
- @Override public boolean apply(Event evt) {
- assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
-
- CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
-
- if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) {
- if (semaphoreMap.containsKey(cacheEvt.partition()))
- semaphoreMap.get(cacheEvt.partition()).release();
- }
-
- return true;
- }
- }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
- }
-
- if (delayExchange)
- delayPartExchange.set(true);
-
- ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
-
- for (Integer node : killNodes) {
- executor.submit(new Runnable() {
- @Override public void run() {
- grid(node).close();
- }
- });
-
- Thread.sleep(stopDelay);
- }
-
- executor.shutdown();
-
- delayPartExchange.set(false);
-
- Thread.sleep(5_000L);
-
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
- }
-
- for (Map<Integer, Semaphore> map : lostMap) {
- for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
- assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
- }
-
- return parts;
- }
- }
-
/**
* Validate query execution on a node.
*
@@ -685,4 +815,124 @@ public class IgniteCachePartitionLossPolicySelfTest extends GridCommonAbstractTe
// TODO Need to add an actual check after https://issues.apache.org/jira/browse/IGNITE-9902 is fixed.
// No-op.
}
+
+ /** */
+ private class TopologyChanger {
+ /** Flag to delay partition exchange */
+ private boolean delayExchange;
+
+ /** List of nodes to kill */
+ private List<Integer> killNodes;
+
+ /** List of nodes to be alive */
+ private List<Integer> aliveNodes;
+
+ /** Delay between node stops */
+ private long stopDelay;
+
+ /**
+ * @param delayExchange Flag for delay partition exchange.
+ * @param killNodes List of nodes to kill.
+ * @param aliveNodes List of nodes to be alive.
+ * @param stopDelay Delay between stopping nodes.
+ */
+ private TopologyChanger(boolean delayExchange, List<Integer> killNodes, List<Integer> aliveNodes,
+ long stopDelay) {
+ this.delayExchange = delayExchange;
+ this.killNodes = killNodes;
+ this.aliveNodes = aliveNodes;
+ this.stopDelay = stopDelay;
+ }
+
+ /**
+ * @return Lost partition ID.
+ * @throws Exception If failed.
+ */
+ private List<Integer> changeTopology() throws Exception {
+ startGrids(4);
+
+ if (isPersistenceEnabled)
+ grid(0).cluster().active(true);
+
+ Affinity<Object> aff = ignite(0).affinity(DEFAULT_CACHE_NAME);
+
+ for (int i = 0; i < aff.partitions(); i++)
+ ignite(0).cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ client = true;
+
+ startGrid(4);
+
+ client = false;
+
+ for (int i = 0; i < 5; i++)
+ info(">>> Node [idx=" + i + ", nodeId=" + ignite(i).cluster().localNode().id() + ']');
+
+ awaitPartitionMapExchange();
+
+ final List<Integer> parts = noPrimaryOrBackupPartition(aliveNodes);
+
+ if (parts.isEmpty())
+ throw new IllegalStateException("No partition on nodes: " + killNodes);
+
+ final List<Map<Integer, Semaphore>> lostMap = new ArrayList<>();
+
+ for (int i : aliveNodes) {
+ HashMap<Integer, Semaphore> semaphoreMap = new HashMap<>();
+
+ for (Integer part : parts)
+ semaphoreMap.put(part, new Semaphore(0));
+
+ lostMap.add(semaphoreMap);
+
+ grid(i).events().localListen(new P1<Event>() {
+ @Override public boolean apply(Event evt) {
+ assert evt.type() == EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
+
+ CacheRebalancingEvent cacheEvt = (CacheRebalancingEvent)evt;
+
+ if (F.eq(DEFAULT_CACHE_NAME, cacheEvt.cacheName())) {
+ if (semaphoreMap.containsKey(cacheEvt.partition()))
+ semaphoreMap.get(cacheEvt.partition()).release();
+ }
+
+ return true;
+ }
+ }, EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST);
+ }
+
+ if (delayExchange)
+ delayPartExchange.set(true);
+
+ ExecutorService executor = Executors.newFixedThreadPool(killNodes.size());
+
+ for (Integer node : killNodes) {
+ executor.submit(new Runnable() {
+ @Override public void run() {
+ grid(node).close();
+ }
+ });
+
+ Thread.sleep(stopDelay);
+ }
+
+ executor.shutdown();
+
+ delayPartExchange.set(false);
+
+ Thread.sleep(5_000L);
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertTrue("Failed to wait for partition LOST event for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ }
+
+ for (Map<Integer, Semaphore> map : lostMap) {
+ for (Map.Entry<Integer, Semaphore> entry : map.entrySet())
+ assertFalse("Partition LOST event raised twice for partition:" + entry.getKey(), entry.getValue().tryAcquire(1));
+ }
+
+ return parts;
+ }
+ }
}