You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ignite.apache.org by "Pavel Pereslegin (Jira)" <ji...@apache.org> on 2019/11/14 09:52:00 UTC

[jira] [Created] (IGNITE-12370) WAL history reservation may fail due to an incorrect determination of the availability of the WAL segment index (under race condition).

Pavel Pereslegin created IGNITE-12370:
-----------------------------------------

             Summary: WAL history reservation may fail due to an incorrect determination of the availability of the WAL segment index (under race condition).
                 Key: IGNITE-12370
                 URL: https://issues.apache.org/jira/browse/IGNITE-12370
             Project: Ignite
          Issue Type: Bug
            Reporter: Pavel Pereslegin


For now, {{FileWriteAheadLogManager#hasIndex}} firstly determines that the WAL segment exists in an archive ({{File.exists}}) and then determines that index was in the archive (using {{Files.list}}). If the archive file was created between these operations {{hasIndex}} will return the false-negative result and the partition map exchange will fail on this node.

Reproducer:
{code:java}
public class IgniteWalHistoryReservationsWithLoadTest extends GridCommonAbstractTest {
    /** {@inheritDoc} */
    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
        IgniteConfiguration cfg = super.getConfiguration(gridName);

        cfg.setConsistentId("NODE$" + gridName.charAt(gridName.length() - 1));

        DataStorageConfiguration memCfg = new DataStorageConfiguration()
            .setDefaultDataRegionConfiguration(
                new DataRegionConfiguration()
                    .setMaxSize(200L * 1024 * 1024)
                    .setPersistenceEnabled(true))
            .setWalMode(WALMode.LOG_ONLY)
            .setWalSegmentSize(512 * 1024)
            .setCheckpointFrequency(500);

        cfg.setDataStorageConfiguration(memCfg);

        CacheConfiguration ccfg1 = new CacheConfiguration();

        ccfg1.setName("cache1");
        ccfg1.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
        ccfg1.setAffinity(new RendezvousAffinityFunction(false, 32));

        cfg.setCacheConfiguration(ccfg1);

        return cfg;
    }

    @Test
    public void testReservationWithConstantLoad() throws Exception {
        final IgniteEx node = startGrid(0);

        node.cluster().active(true);

        AtomicLong cntr = new AtomicLong(100_000);

        ConstantLoader ldr = new ConstantLoader(node.cache("cache1"), cntr);

        IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(ldr, 1, "loader");

        U.sleep(500);

        forceCheckpoint(node);

        // Reserve history from the beginning.
        node.context().cache().context().database().reserveHistoryForExchange();

        long endTime = U.currentTimeMillis() + 60_000;

        GridCacheContext ctx = node.cachex("cache1").context();

        int grpId = ctx.groupId();

        int parts = ctx.topology().partitions();

        try {
            while (U.currentTimeMillis() < endTime && !Thread.currentThread().isInterrupted()) {
                try {
                    for (int p = 0; p < parts; p++) {
                        boolean reserved = node.context().cache().context().database().reserveHistoryForPreloading(grpId, p, cntr.get());

                        assertTrue("Unable to reserve history [p=" + p + ", cntr=" + cntr.get() + "]", reserved);
                    }
                } finally {
                    node.context().cache().context().database().releaseHistoryForPreloading();
                }
            }
        }
        finally {
            node.context().cache().context().database().releaseHistoryForExchange();

            ldr.stop();
        }

        fut.get(10_000);
    }

    static class ConstantLoader implements Callable<Void> {
        private final IgniteCache cache;

        private final AtomicLong cntr;

        private volatile boolean stop;

        ConstantLoader(IgniteCache cache, AtomicLong cntr) {
            this.cache = cache;
            this.cntr = cntr;
        }

        @Override public Void call() throws Exception {
            while (!stop && !Thread.currentThread().isInterrupted()) {
                long n = cntr.getAndIncrement();

                cache.put(n, n);

                if (n % 100_000 == 0)
                    log.info("Loaded " + n);
            }

            return null;
        }

        public void stop() {
            stop = true;
        }
    }

    /** {@inheritDoc} */
    @Override protected void beforeTestsStarted() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();
    }

    /** {@inheritDoc} */
    @Override protected void afterTest() throws Exception {
        stopAllGrids();

        cleanPersistenceDir();
    }
}
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)