You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ignite.apache.org by "Nikolay Izhikov (Jira)" <ji...@apache.org> on 2020/04/17 07:46:00 UTC
[jira] [Updated] (IGNITE-12605) Historical (WAL) rebalance can
start on a cleared partition if some baseline node leaves the cluster and
then joins back.
[ https://issues.apache.org/jira/browse/IGNITE-12605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nikolay Izhikov updated IGNITE-12605:
-------------------------------------
Fix Version/s: 2.8.1
> Historical (WAL) rebalance can start on a cleared partition if some baseline node leaves the cluster and then joins back.
> -------------------------------------------------------------------------------------------------------------------------
>
> Key: IGNITE-12605
> URL: https://issues.apache.org/jira/browse/IGNITE-12605
> Project: Ignite
> Issue Type: Bug
> Affects Versions: 2.9
> Reporter: Pavel Pereslegin
> Assignee: Pavel Pereslegin
> Priority: Major
> Fix For: 2.9, 2.8.1
>
> Attachments: WalRebalanceOnCleanPartitionReproducerUnstable.java
>
> Time Spent: 20m
> Remaining Estimate: 0h
>
> _(scenario: cluster with 3 nodes, node3 starts historical rebalancing and then node2 leaves cluster)_
> On partition map exchange initiated by baseline node leaving, the historical supplier is not provided in the full message (assignPartitionStates() isn't called on coordinator when a node leaves). Since we don't have a historical supplier "historical" partition scheduled for clearing, then when a node joins back assignPartitionStates() is called and we have a supplier for historical rebalance, but partition may be cleared already.
> After such rebalance we have inconsistent partitions on a "historically rebalanced" node (with consistent partition counters and state).
> "Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this issue can be "unstable" reproduced without it (see attachment)).
>
> Reproducer shows the following error.
> {noformat}
> java.lang.AssertionError:
> |------|-----------------------|
> | | entries count |
> | part |-----------------------|
> | | node1 | node2 | node3 |
> |------|-----------------------|
> | 0 | 6250 | 6250 | 3125 |
> | 1 | 6250 | 6250 | 3125 |
> | 2 | 6250 | 6250 | 3125 |
> ...
> | 31 | 6250 | 6250 | 3125 |
> |------|-------|-------|-------|
> {noformat}
> (partitions on node3 have been cleared before start historical rebalance).
>
> Reproducer:
> {code:java}
> public class WalRebalanceOnCleanPartitionReproducer extends GridCommonAbstractTest {
> /** Block predicate. */
> private P2<ClusterNode, Message> blockPred;
> /** {@inheritDoc} */
> @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
> IgniteConfiguration cfg = super.getConfiguration(gridName);
> cfg.setConsistentId(gridName);
> cfg.setRebalanceThreadPoolSize(1);
> CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
> .setCacheMode(CacheMode.PARTITIONED)
> .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
> .setBackups(2)
> .setAffinity(new RendezvousAffinityFunction(false, 32))
> .setRebalanceMode(CacheRebalanceMode.ASYNC);
> cfg.setCacheConfiguration(ccfg1);
> TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();
> commSpi.blockMessages(blockPred);
> cfg.setCommunicationSpi(commSpi);
> DataStorageConfiguration dsCfg = new DataStorageConfiguration()
> .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
> .setCheckpointFrequency(5_000)
> .setWalMode(WALMode.LOG_ONLY)
> .setPageSize(1024)
> .setWalSegmentSize(8 * 1024 * 1024)
> .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
> .setName("dfltDataRegion")
> .setPersistenceEnabled(true)
> .setMaxSize(512 * 1024 * 1024)
> );
> cfg.setDataStorageConfiguration(dsCfg);
> return cfg;
> }
> /** {@inheritDoc} */
> @Override protected void beforeTestsStarted() throws Exception {
> stopAllGrids();
> cleanPersistenceDir();
> super.beforeTestsStarted();
> }
> /**
> *
> */
> @Test
> @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
> public void testHistoricalRebalanceRestart() throws Exception {
> IgniteEx crd = startGrid(0);
> crd.cluster().state(ClusterState.ACTIVE);
> crd.cluster().baselineAutoAdjustEnabled(false);
> Ignite node1 = startGrid(1);
> Ignite node2 = startGrid(2);
> List<ClusterNode> blt = new ArrayList<>(crd.context().discovery().aliveServerNodes());
> crd.cluster().setBaselineTopology(blt);
> IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);
> System.out.println(">>> load 100k entries");
> loadData(cache0, 0, 100_000);
> forceCheckpoint();
> System.out.println(">>> stop node 2");
> node2.close();
> awaitPartitionMapExchange();
> System.out.println(">>> load 100k entries again");
> loadData(cache0, 100_000, 100_000);
> blockPred = (node, msg) -> {
> if (msg instanceof GridDhtPartitionDemandMessage) {
> GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;
> return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
> }
> return false;
> };
> startGrid(2);
> TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));
> spi2.waitForBlocked();
> spi2.stopBlock();
> // Forces rebalanceing to restart without assign partition states.
> System.out.println(">>> stop grid 1");
> node1.close();
> spi2.blockMessages(blockPred);
> spi2.waitForBlocked();
> System.out.println(">>> start grid 1");
> startGrid(1);
> spi2.stopBlock();
> // just to be sure
> U.sleep(3_000);
> awaitPartitionMapExchange();
> verifyPartittionSizes();
> }
> /** */
> private void verifyPartittionSizes() {
> int grids = G.allGrids().size();
> SB buf = new SB();
> for (int p = 0; p < 32; p++) {
> Set<Long> sizesSet = new LinkedHashSet<>();
> List<GridDhtLocalPartition> parts = new ArrayList<>();
> for (int n = 0; n < grids; n++) {
> GridDhtLocalPartition part = grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);
> assert part != null;
> assert part.state() == GridDhtPartitionState.OWNING;
> sizesSet.add(part.fullSize());
> parts.add(part);
> }
> if (sizesSet.size() == 1)
> continue;
> buf.a(String.format("\n| %2d | ", p));
> for (GridDhtLocalPartition part : parts)
> buf.a(String.format(" %04d", part.fullSize())).a(" | ");
> }
> assertTrue("\n|------|-----------------------|" +
> "\n| | entries count |" +
> "\n| part |-----------------------|" +
> "\n| | node1 | node2 | node3 |" +
> "\n|------|-----------------------|" +
> buf +
> "\n|------|-------|-------|-------|", buf.length() == 0);
> }
> /** */
> private void loadData(IgniteCache cache, int off, int cnt) {
> try (IgniteDataStreamer<Integer, String> streamer = grid(0).dataStreamer(cache.getName())) {
> for (int i = off; i < off + cnt; i++)
> streamer.addData(i, String.valueOf(i));
> }
> }
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)