You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Pavel Pereslegin (Jira)" <ji...@apache.org> on 2020/01/29 15:15:00 UTC

[jira] [Created] (IGNITE-12605) Historical (WAL) rebalance can start on a cleared partition if some baseline node leaves the cluster and then joins back.

Pavel Pereslegin created IGNITE-12605:
-----------------------------------------

             Summary: Historical (WAL) rebalance can start on a cleared partition if some baseline node leaves the cluster and then joins back.
                 Key: IGNITE-12605
                 URL: https://issues.apache.org/jira/browse/IGNITE-12605
             Project: Ignite
          Issue Type: Bug
    Affects Versions: 2.9
         Environment: 


            Reporter: Pavel Pereslegin
            Assignee: Pavel Pereslegin


On partition map exchange initiated by baseline node leaving, the historical supplier is not provided in the full message (assignPartitionStates() isn't called when a node leaves).

Since we don't have a historical supplier "historical" partition scheduled for clearing, then when a node joins back assignPartitionStates() is called and we have a supplier for historical rebalance, but partition may be cleared already.
 After such rebalance we have inconsistent partitions on a "historically rebalanced" node (with consistent partition counters and state).

"Inlined" reproducer uses TestRecordingCommunicationSpi to sync nodes (but this issue can be "unstable" reproduced without it (see attachment)).

Reproducer shows the following errors.

Error 1 (partitions have been cleared).
{noformat}
java.lang.AssertionError: 
|------|-----------------------|
|      |     entries count     |
| part |-----------------------|
|      | node1 | node2 | node3 |
|------|-----------------------|
|   0  |  6250 |  6250 |  3125 | 
|   1  |  6250 |  6250 |  3125 | 
|   2  |  6250 |  6250 |  3125 | 
  ... 
|  31  |  6250 |  6250 |  3125 | 
|------|-------|-------|-------|
{noformat}
Error 2 (should be investigated deeply).
{noformat}
java.lang.AssertionError: Reached end of WAL but not all partitions are done
	at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.advance(GridCacheOffheapManager.java:1419)
	at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.next(GridCacheOffheapManager.java:1295)
	at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1255)
	at org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager$WALHistoricalIterator.nextX(GridCacheOffheapManager.java:1163)
	at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.nextX(IgniteRebalanceIteratorImpl.java:135)
	at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.next(IgniteRebalanceIteratorImpl.java:215)
	at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl.peek(IgniteRebalanceIteratorImpl.java:155)
	at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionSupplier.handleDemandMessage(GridDhtPartitionSupplier.java:316)
	at org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader.lambda$handleDemandMessage$1(GridDhtPreloader.java:374)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
{noformat}
 

Reproducer:
{code:java}
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.TestRecordingCommunicationSpi;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemandMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.WithSystemProperty;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;

import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;

/** */
public class WalRebalanceOnCleanPartitionReproducer extends GridCommonAbstractTest {
    /** Block predicate. */
    private P2<ClusterNode, Message> blockPred;

    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId(gridName);

        cfg.setRebalanceThreadPoolSize(1);

        CacheConfiguration ccfg1 = new CacheConfiguration(DEFAULT_CACHE_NAME)
            .setCacheMode(CacheMode.PARTITIONED)
            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL)
            .setBackups(2)
            .setAffinity(new RendezvousAffinityFunction(false, 32))
            .setRebalanceMode(CacheRebalanceMode.ASYNC);

        cfg.setCacheConfiguration(ccfg1);

        TestRecordingCommunicationSpi commSpi = new TestRecordingCommunicationSpi();

        commSpi.blockMessages(blockPred);

        cfg.setCommunicationSpi(commSpi);

        DataStorageConfiguration dsCfg = new DataStorageConfiguration()
            .setConcurrencyLevel(Runtime.getRuntime().availableProcessors() * 4)
            .setCheckpointFrequency(5_000)
            .setWalMode(WALMode.LOG_ONLY)
            .setPageSize(1024)
            .setWalSegmentSize(8 * 1024 * 1024)
            .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                .setName("dfltDataRegion")
                .setPersistenceEnabled(true)
                .setMaxSize(512 * 1024 * 1024)
            );

        cfg.setDataStorageConfiguration(dsCfg);

        return cfg;
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();

        super.beforeTestsStarted();
    }

    /**
     *
     */
    @Test
    @WithSystemProperty(key = IGNITE_PDS_WAL_REBALANCE_THRESHOLD, value = "0")
    public void testHistoricalRebalanceNotStartsAfterNodeLeft() throws Exception {
        IgniteEx crd = startGrid(0);

        crd.cluster().state(ClusterState.ACTIVE);
        crd.cluster().baselineAutoAdjustEnabled(false);

        Ignite node1 = startGrid(1);
        Ignite node2 = startGrid(2);

        List<ClusterNode> blt = new ArrayList<>(crd.context().discovery().aliveServerNodes());

        crd.cluster().setBaselineTopology(blt);

        IgniteCache<Integer, String> cache0 = crd.cache(DEFAULT_CACHE_NAME);

        System.out.println(">>> load 100k entries");
        loadData(cache0, 0, 100_000);

        forceCheckpoint();

        System.out.println(">>> stop node 2");
        node2.close();

        awaitPartitionMapExchange();

        System.out.println(">>> load 100k entries again");
        loadData(cache0, 100_000, 100_000);

        CountDownLatch startLatch = new CountDownLatch(1);

        blockPred = (node, msg) -> {
            if (msg instanceof GridDhtPartitionDemandMessage) {
                GridDhtPartitionDemandMessage msg0 = (GridDhtPartitionDemandMessage)msg;

                return msg0.groupId() == CU.cacheId(DEFAULT_CACHE_NAME);
            }

            return false;
        };

        GridTestUtils.runAsync(() -> {
            System.out.println(">>> start grid 2");
            startGrid(2);

            startLatch.countDown();

            return null;
        });

        startLatch.await();

        TestRecordingCommunicationSpi spi2 = TestRecordingCommunicationSpi.spi(grid(2));

        spi2.waitForBlocked();
        spi2.stopBlock();

        // Forces rebalanceing to restart without assign partition states.
        System.out.println(">>> stop grid 1");
        node1.close();

        spi2.blockMessages(blockPred);
        spi2.waitForBlocked();

        System.out.println(">>> start grid 1");
        startGrid(1);

        spi2.stopBlock();

        // just to be sure
        U.sleep(3_000);

        awaitPartitionMapExchange();

        verifyPartittionSizes();
    }

    /** */
    private void verifyPartittionSizes() {
        int grids = G.allGrids().size();

        SB buf = new SB();

        for (int p = 0; p < 32; p++) {
            Set<Long> sizesSet = new LinkedHashSet<>();
            List<GridDhtLocalPartition> parts = new ArrayList<>();

            for (int n = 0; n < grids; n++) {
                GridDhtLocalPartition part = grid(n).cachex(DEFAULT_CACHE_NAME).context().topology().localPartition(p);

                assert part != null;
                assert part.state() == GridDhtPartitionState.OWNING;

                sizesSet.add(part.fullSize());
                parts.add(part);
            }

            if (sizesSet.size() == 1)
                continue;

            buf.a(String.format("\n|  %2d  | ", p));

            for (GridDhtLocalPartition part : parts)
                buf.a(String.format(" %04d", part.fullSize())).a(" | ");
        }

        assertTrue("\n|------|-----------------------|" +
            "\n|      |     entries count     |" +
            "\n| part |-----------------------|" +
            "\n|      | node1 | node2 | node3 |" +
            "\n|------|-----------------------|" +
            buf +
            "\n|------|-------|-------|-------|", buf.length() == 0);
    }

    /** */
    private void loadData(IgniteCache cache, int off, int cnt) {
        try (IgniteDataStreamer<Integer, String> streamer = grid(0).dataStreamer(cache.getName())) {
            for (int i = off; i < off + cnt; i++)
                streamer.addData(i, String.valueOf(i));
        }
    }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)