You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/03/22 14:32:06 UTC
[ignite] branch master updated: IGNITE-14373 Fix race between auto
rollover WAL segment and deactivate of WAL (#8912)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8ab281b IGNITE-14373 Fix race between auto rollover WAL segment and deactivate of WAL (#8912)
8ab281b is described below
commit 8ab281b8f3f19e4af1d69e0ae5af12664ab51a3b
Author: Kirill Tkalenko <tk...@yandex.ru>
AuthorDate: Mon Mar 22 17:31:34 2021 +0300
IGNITE-14373 Fix race between auto rollover WAL segment and deactivate of WAL (#8912)
---
.../persistence/wal/FileWriteAheadLogManager.java | 141 ++++++++++++++++-----
.../db/wal/WriteAheadLogManagerSelfTest.java | 128 +++++++++++++++++--
2 files changed, 222 insertions(+), 47 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index 6a74f04..9407404 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -339,11 +339,11 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
*/
@Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
- /**
- * Reference to the last added next archive timeout check object. Null if mode is not enabled. Should be cancelled
- * at shutdown
- */
- @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
+ /** Reference to the last added next timeout rollover object. */
+ @Nullable private TimeoutRollover timeoutRollover;
+
+ /** Timeout rollover mutex. */
+ @Nullable private final Object timeoutRolloverMux;
/**
* Listener invoked for each segment file IO initializer.
@@ -402,6 +402,8 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
+ timeoutRolloverMux = walAutoArchiveAfterInactivity > 0 ? new Object() : null;
+
double thresholdWalArchiveSizePercentage = IgniteSystemProperties.getDouble(
IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, DFLT_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
@@ -639,17 +641,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/**
* Method is called twice on deactivate and stop.
* It shutdown workers but do not deallocate them to avoid duplication.
- * */
+ */
@Override protected void stop0(boolean cancel) {
final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
if (schedule != null)
schedule.close();
- final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
-
- if (timeoutObj != null)
- cctx.time().removeTimeoutObject(timeoutObj);
+ stopAutoRollover();
try {
fileHandleManager.onDeactivate();
@@ -758,33 +757,15 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
* does check of inactivity period and schedules new launch.
*/
private void scheduleNextInactivityPeriodElapsedCheck() {
- final long lastRecMs = lastRecordLoggedMs.get();
- final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
-
- if (log.isDebugEnabled())
- log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
+ assert walAutoArchiveAfterInactivity > 0;
+ assert timeoutRolloverMux != null;
- nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ synchronized (timeoutRolloverMux) {
+ long lastRecMs = lastRecordLoggedMs.get();
+ long nextEndTime = lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs + walAutoArchiveAfterInactivity;
- @Override public IgniteUuid timeoutId() {
- return id;
- }
-
- @Override public long endTime() {
- return nextPossibleAutoArchive;
- }
-
- @Override public void onTimeout() {
- if (log.isDebugEnabled())
- log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
-
- checkWalRolloverRequiredDuringInactivityPeriod();
-
- scheduleNextInactivityPeriodElapsedCheck();
- }
- };
- cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
+ cctx.time().addTimeoutObject(timeoutRollover = new TimeoutRollover(nextEndTime));
+ }
}
/** {@inheritDoc} */
@@ -3423,4 +3404,94 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
}
}
}
+
+ /**
+ * Timeout object for automatically rollover segments if the recording
+ * to the WAL was not more than or equal to {@link #walAutoArchiveAfterInactivity}.
+ */
+ private class TimeoutRollover implements GridTimeoutObject {
+ /** ID of timeout object. */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** Timestamp for triggering. */
+ private final long endTime;
+
+ /** Cancel flag. */
+ private boolean cancel;
+
+ /**
+ * Constructor.
+ *
+ * @param endTime Timestamp for triggering.
+ */
+ private TimeoutRollover(long endTime) {
+ if (log.isDebugEnabled())
+ log.debug("Schedule WAL rollover check at " + new Time(endTime).toString());
+
+ this.endTime = endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid timeoutId() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long endTime() {
+ return endTime;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTimeout() {
+ assert walAutoArchiveAfterInactivity > 0;
+ assert timeoutRolloverMux != null;
+
+ synchronized (timeoutRolloverMux) {
+ if (!cancel) {
+ if (log.isDebugEnabled()) {
+ log.debug("Checking if WAL rollover required (" +
+ new Time(U.currentTimeMillis()).toString() + ")");
+ }
+
+ checkWalRolloverRequiredDuringInactivityPeriod();
+
+ scheduleNextInactivityPeriodElapsedCheck();
+ }
+ }
+ }
+
+ /**
+ * Cancel auto rollover.
+ */
+ public void cancel() {
+ assert walAutoArchiveAfterInactivity > 0;
+ assert timeoutRolloverMux != null;
+
+ synchronized (timeoutRolloverMux) {
+ if (log.isDebugEnabled())
+ log.debug("Auto rollover is canceled");
+
+ cancel = true;
+ }
+ }
+ }
+
+ /**
+ * Stop auto rollover.
+ */
+ private void stopAutoRollover() {
+ if (walAutoArchiveAfterInactivity > 0) {
+ assert timeoutRolloverMux != null;
+
+ synchronized (timeoutRolloverMux) {
+ TimeoutRollover timeoutRollover = this.timeoutRollover;
+
+ if (timeoutRollover != null) {
+ timeoutRollover.cancel();
+
+ cctx.time().removeTimeoutObject(timeoutRollover);
+ }
+ }
+ }
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
index b4c1d9e..e978e77 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/wal/WriteAheadLogManagerSelfTest.java
@@ -21,29 +21,39 @@ import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
+import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
+import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
+import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.ListeningTestLogger;
import org.apache.ignite.testframework.LogListener;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
import org.junit.Test;
+import static java.util.Collections.emptyList;
import static java.util.concurrent.ThreadLocalRandom.current;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.ignite.testframework.GridTestUtils.setFieldValue;
/**
* Class for testing WAL manager.
@@ -68,6 +78,7 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
return super.getConfiguration(igniteInstanceName)
+ .setFailureHandler(new StopNodeFailureHandler())
.setCacheConfiguration(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
.setDataStorageConfiguration(
new DataStorageConfiguration()
@@ -79,7 +90,7 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
@Override protected IgniteEx startGrids(int cnt) throws Exception {
IgniteEx n = super.startGrids(cnt);
- n.cluster().state(ClusterState.ACTIVE);
+ n.cluster().state(ACTIVE);
awaitPartitionMapExchange();
return n;
@@ -176,24 +187,117 @@ public class WriteAheadLogManagerSelfTest extends GridCommonAbstractTest {
*/
@Test
public void testAutoArchiveWithoutNullPointerException() throws Exception {
- LogListener logLsnr = LogListener.matches(
+ setRootLoggerDebugLevel();
+
+ LogListener logLsnr0 = LogListener.matches("Checking if WAL rollover required").build();
+
+ LogListener logLsnr1 = LogListener.matches(
Pattern.compile("Rollover segment \\[\\d+ to \\d+\\], recordType=null")).build();
IgniteEx n = startGrid(0, cfg -> {
- cfg.setGridLogger(new ListeningTestLogger(cfg.getGridLogger(), logLsnr))
- .getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(200);
+ cfg.setGridLogger(new ListeningTestLogger(log, logLsnr0, logLsnr1))
+ .getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(100_000);
});
- n.cluster().state(ClusterState.ACTIVE);
+ n.cluster().state(ACTIVE);
awaitPartitionMapExchange();
- assertNotNull(GridTestUtils.getFieldValue(walMgr(n), "nextAutoArchiveTimeoutObj"));
+ GridTimeoutObject timeoutObj = timeoutRollover(n);
+ assertNotNull(timeoutObj);
+
+ n.cache(DEFAULT_CACHE_NAME).put(current().nextInt(), new byte[16]);
+
+ disableWal(n);
+
+ lastRecordLoggedMs(n).set(1);
+
+ timeoutObj.onTimeout();
+
+ assertTrue(logLsnr0.check());
+ assertTrue(logLsnr1.check());
+ }
+
+ /**
+ * Checking the absence of a race between the deactivation of the VAL and
+ * automatic archiving, which may lead to a fail of the node.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testNoRaceAutoArchiveAndDeactivation() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ if (log.isInfoEnabled())
+ log.info(">>> Test iteration:" + i);
+
+ IgniteEx n = startGrid(0, cfg -> {
+ cfg.getDataStorageConfiguration().setWalAutoArchiveAfterInactivity(10);
+ });
+
+ n.cluster().state(ACTIVE);
+ awaitPartitionMapExchange();
+
+ GridTimeoutObject timeoutObj = timeoutRollover(n);
+ assertNotNull(timeoutObj);
- assertTrue(waitForCondition(() -> {
n.cache(DEFAULT_CACHE_NAME).put(current().nextInt(), new byte[16]);
- return logLsnr.check();
- }, getTestTimeout()));
+ CountDownLatch l = new CountDownLatch(1);
+ AtomicBoolean stop = new AtomicBoolean();
+
+ IgniteInternalFuture<Object> fut = runAsync(() -> {
+ l.countDown();
+
+ while (!stop.get())
+ lastRecordLoggedMs(n).set(1);
+ });
+
+ assertTrue(l.await(getTestTimeout(), MILLISECONDS));
+
+ walMgr(n).onDeActivate(n.context());
+ stop.set(true);
+
+ fut.get(getTestTimeout());
+
+ assertEquals(1, G.allGrids().size());
+
+ stopAllGrids();
+ cleanPersistenceDir();
+ }
+ }
+
+ /**
+ * Getting {@code FileWriteAheadLogManager#lastRecordLoggedMs}
+ *
+ * @param n Node.
+ * @return Container with last WAL record logged timestamp.
+ */
+ private AtomicLong lastRecordLoggedMs(IgniteEx n) {
+ return getFieldValue(walMgr(n), "lastRecordLoggedMs");
+ }
+
+ /**
+ * Disable WAL.
+ *
+ * @param n Node.
+ */
+ private void disableWal(IgniteEx n) throws Exception {
+ WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext();
+ assertNotNull(walDisableCtx);
+
+ setFieldValue(walDisableCtx, "disableWal", true);
+
+ assertTrue(walDisableCtx.check());
+ assertNull(walMgr(n).log(new DataRecord(emptyList())));
+ }
+
+ /**
+ * Getting {@code FileWriteAheadLogManager#timeoutRollover};
+ *
+ * @param n Node.
+ * @return Timeout object.
+ */
+ @Nullable private GridTimeoutObject timeoutRollover(IgniteEx n) {
+ return getFieldValue(walMgr(n), "timeoutRollover");
}
/**