You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/03/22 14:32:06 UTC

[ignite] branch master updated: IGNITE-14373 Fix race between auto rollover WAL segment and deactivate of WAL (#8912)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8ab281b  IGNITE-14373 Fix race between auto rollover WAL segment and deactivate of WAL (#8912)
8ab281b is described below

commit 8ab281b8f3f19e4af1d69e0ae5af12664ab51a3b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Mar 22 17:31:34 2021 +0300

    IGNITE-14373 Fix race between auto rollover WAL segment and deactivate of WAL (#8912)
---
 .../persistence/wal/FileWriteAheadLogManager.java  | 141 ++++++++++++++++-----
 .../db/wal/WriteAheadLogManagerSelfTest.java       | 128 +++++++++++++++++--
 2 files changed, 222 insertions(+), 47 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 6a74f04..9407404 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -339,11 +339,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      */
     @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
 
-    /**
-     * Reference to the last added next archive timeout check object. Null if mode is not enabled. Should be cancelled
-     * at shutdown
-     */
-    @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
+    /** Reference to the last added next timeout rollover object. */
+    @Nullable private TimeoutRollover timeoutRollover;
+
+    /** Timeout rollover mutex. */
+    @Nullable private final Object timeoutRolloverMux;
 
     /**
      * Listener invoked for each segment file IO initializer.
@@ -402,6 +402,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
         segmentFileInputFactory = new SimpleSegmentFileInputFactory();
         walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
 
+        timeoutRolloverMux = walAutoArchiveAfterInactivity > 0 ? new Object() : null;
+
         double thresholdWalArchiveSizePercentage = IgniteSystemProperties.getDouble(
             IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
 
@@ -639,17 +641,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /**
      * Method is called twice on deactivate and stop.
      * It shutdown workers but do not deallocate them to avoid duplication.
-     * */
+     */
     @Override protected void stop0(boolean cancel) {
         final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
 
         if (schedule != null)
             schedule.close();
 
-        final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
-
-        if (timeoutObj != null)
-            cctx.time().removeTimeoutObject(timeoutObj);
+        stopAutoRollover();
 
         try {
             fileHandleManager.onDeactivate();
@@ -758,33 +757,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
      * does check of inactivity period and schedules new launch.
      */
     private void scheduleNextInactivityPeriodElapsedCheck() {
-        final long lastRecMs = lastRecordLoggedMs.get();
-        final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
-
-        if (log.isDebugEnabled())
-            log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
+        assert walAutoArchiveAfterInactivity > 0;
+        assert timeoutRolloverMux != null;
 
-        nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
-            private final IgniteUuid id = IgniteUuid.randomUuid();
+        synchronized (timeoutRolloverMux) {
+            long lastRecMs = lastRecordLoggedMs.get();
+            long nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
 
-            @Override public IgniteUuid timeoutId() {
-                return id;
-            }
-
-            @Override public long endTime() {
-                return nextPossibleAutoArchive;
-            }
-
-            @Override public void onTimeout() {
-                if (log.isDebugEnabled())
-                    log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
-
-                checkWalRolloverRequiredDuringInactivityPeriod();
-
-                scheduleNextInactivityPeriodElapsedCheck();
-            }
-        };
-        cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
+            cctx.time().addTimeoutObject(timeoutRollover = new TimeoutRollover(nextEndTime));
+        }
     }
 
     /** {@inheritDoc} */
@@ -3423,4 +3404,94 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
             }
         }
     }
+
+    /**
+     * Timeout object for automatically rollover segments if the recording
+     * to the WAL was not more than or equal to {@link #walAutoArchiveAfterInactivity}.
+     */
+    private class TimeoutRollover implements GridTimeoutObject {
+        /** ID of timeout object. */
+        private final IgniteUuid id = IgniteUuid.randomUuid();
+
+        /** Timestamp for triggering. */
+        private final long endTime;
+
+        /** Cancel flag. */
+        private boolean cancel;
+
+        /**
+         * Constructor.
+         *
+         * @param endTime Timestamp for triggering.
+         */
+        private TimeoutRollover(long endTime) {
+            if (log.isDebugEnabled())
+                log.debug("Schedule WAL rollover check at " + new Time(endTime).toString());
+
+            this.endTime = endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public IgniteUuid timeoutId() {
+            return id;
+        }
+
+        /** {@inheritDoc} */
+        @Override public long endTime() {
+            return endTime;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onTimeout() {
+            assert walAutoArchiveAfterInactivity > 0;
+            assert timeoutRolloverMux != null;
+
+            synchronized (timeoutRolloverMux) {
+                if (!cancel) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("Checking if WAL rollover required (" +
+                            new Time(U.currentTimeMillis()).toString() + ")");
+                    }
+
+                    checkWalRolloverRequiredDuringInactivityPeriod();
+
+                    scheduleNextInactivityPeriodElapsedCheck();
+                }
+            }
+        }
+
+        /**
+         * Cancel auto rollover.
+         */
+        public void cancel() {
+            assert walAutoArchiveAfterInactivity > 0;
+            assert timeoutRolloverMux != null;
+
+            synchronized (timeoutRolloverMux) {
+                if (log.isDebugEnabled())
+                    log.debug("Auto rollover is canceled");
+
+                cancel = true;
+            }
+        }
+    }
+
+    /**
+     * Stop auto rollover.
+     */
+    private void stopAutoRollover() {
+        if (walAutoArchiveAfterInactivity > 0) {
+            assert timeoutRolloverMux != null;
+
+            synchronized (timeoutRolloverMux) {
+                TimeoutRollover timeoutRollover = this.timeoutRollover;
+
+                if (timeoutRollover != null) {
+                    timeoutRollover.cancel();
+
+                    cctx.time().removeTimeoutObject(timeoutRollover);
+                }
+            }
+        }
+    }
 }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
index b4c1d9e..e978e77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
@@ -21,29 +21,39 @@ import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Pattern;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterState;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
 import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.typedef.G;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.ListeningTestLogger;
 import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Test;
 
+import static java.util.Collections.emptyList;
 import static java.util.concurrent.ThreadLocalRandom.current;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
 import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
 
 /**
  * Class for testing WAL manager.
@@ -68,6 +78,7 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         return super.getConfiguration(igniteInstanceName)
+            .setFailureHandler(new StopNodeFailureHandler())
             .setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
             .setDataStorageConfiguration(
                 new DataStorageConfiguration()
@@ -79,7 +90,7 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
     @Override protected IgniteEx startGrids(int cnt) throws Exception {
         IgniteEx n = super.startGrids(cnt);
 
-        n.cluster().state(ClusterState.ACTIVE);
+        n.cluster().state(ACTIVE);
         awaitPartitionMapExchange();
 
         return n;
@@ -176,24 +187,117 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
      */
     @Test
     public void testAutoArchiveWithoutNullPointerException() throws Exception {
-        LogListener logLsnr = LogListener.matches(
+        setRootLoggerDebugLevel();
+
+        LogListener logLsnr0 = LogListener.matches("Checking if WAL rollover required").build();
+
+        LogListener logLsnr1 = LogListener.matches(
             Pattern.compile("Rollover segment \\[\\d+ to \\d+\\], recordType=null")).build();
 
         IgniteEx n = startGrid(0, cfg -> {
-            cfg.setGridLogger(new ListeningTestLogger(cfg.getGridLogger(), logLsnr))
-                .getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(200);
+            cfg.setGridLogger(new ListeningTestLogger(log, logLsnr0, logLsnr1))
+                .getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(100_000);
         });
 
-        n.cluster().state(ClusterState.ACTIVE);
+        n.cluster().state(ACTIVE);
         awaitPartitionMapExchange();
 
-        assertNotNull(GridTestUtils.getFieldValue(walMgr(n), "nextAutoArchiveTimeoutObj"));
+        GridTimeoutObject timeoutObj = timeoutRollover(n);
+        assertNotNull(timeoutObj);
+
+        n.cache(DEFAULT_CACHE_NAME).put(current().nextInt(), new byte[16]);
+
+        disableWal(n);
+
+        lastRecordLoggedMs(n).set(1);
+
+        timeoutObj.onTimeout();
+
+        assertTrue(logLsnr0.check());
+        assertTrue(logLsnr1.check());
+    }
+
+    /**
+     * Checking the absence of a race between the deactivation of the VAL and
+     * automatic archiving, which may lead to a fail of the node.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testNoRaceAutoArchiveAndDeactivation() throws Exception {
+        for (int i = 0; i < 10; i++) {
+            if (log.isInfoEnabled())
+                log.info(">>> Test iteration:" + i);
+
+            IgniteEx n = startGrid(0, cfg -> {
+                cfg.getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(10);
+            });
+
+            n.cluster().state(ACTIVE);
+            awaitPartitionMapExchange();
+
+            GridTimeoutObject timeoutObj = timeoutRollover(n);
+            assertNotNull(timeoutObj);
 
-        assertTrue(waitForCondition(() -> {
             n.cache(DEFAULT_CACHE_NAME).put(current().nextInt(), new byte[16]);
 
-            return logLsnr.check();
-        }, getTestTimeout()));
+            CountDownLatch l = new CountDownLatch(1);
+            AtomicBoolean stop = new AtomicBoolean();
+
+            IgniteInternalFuture<Object> fut = runAsync(() -> {
+                l.countDown();
+
+                while (!stop.get())
+                    lastRecordLoggedMs(n).set(1);
+            });
+
+            assertTrue(l.await(getTestTimeout(), MILLISECONDS));
+
+            walMgr(n).onDeActivate(n.context());
+            stop.set(true);
+
+            fut.get(getTestTimeout());
+
+            assertEquals(1, G.allGrids().size());
+
+            stopAllGrids();
+            cleanPersistenceDir();
+        }
+    }
+
+    /**
+     * Getting {@code FileWriteAheadLogManager#lastRecordLoggedMs}
+     *
+     * @param n Node.
+     * @return Container with last WAL record logged timestamp.
+     */
+    private AtomicLong lastRecordLoggedMs(IgniteEx n) {
+        return getFieldValue(walMgr(n), "lastRecordLoggedMs");
+    }
+
+    /**
+     * Disable WAL.
+     *
+     * @param n Node.
+     */
+    private void disableWal(IgniteEx n) throws Exception {
+        WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext();
+        assertNotNull(walDisableCtx);
+
+        setFieldValue(walDisableCtx, "disableWal", true);
+
+        assertTrue(walDisableCtx.check());
+        assertNull(walMgr(n).log(new DataRecord(emptyList())));
+    }
+
+    /**
+     * Getting {@code FileWriteAheadLogManager#timeoutRollover};
+     *
+     * @param n Node.
+     * @return Timeout object.
+     */
+    @Nullable private GridTimeoutObject timeoutRollover(IgniteEx n) {
+        return getFieldValue(walMgr(n), "timeoutRollover");
     }
 
     /**