You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/02/26 07:52:41 UTC
[ignite] branch master updated: IGNITE-14185 ynchronous checkpoints
on several nodes greatly increase a latency of distributed transaction
(#8803)
This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e18a970 IGNITE-14185 ynchronous checkpoints on several nodes greatly increase a latency of distributed transaction (#8803)
e18a970 is described below
commit e18a9704d071c25ba1c4fd44f9ab83f9709ca6ad
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Feb 26 10:52:12 2021 +0300
IGNITE-14185 ynchronous checkpoints on several nodes greatly increase a latency of distributed transaction (#8803)
---
.../util/GridCommandHandlerPropertiesTest.java | 39 ++++++++++++++
.../GridCacheDatabaseSharedManager.java | 20 +++++++-
.../persistence/checkpoint/CheckpointManager.java | 7 ++-
.../cache/persistence/checkpoint/Checkpointer.java | 34 +++++++++++--
.../checkpoint/LightweightCheckpointManager.java | 3 +-
.../db/file/IgnitePdsCheckpointSimpleTest.java | 59 +++++++++++++++++++++-
6 files changed, 153 insertions(+), 9 deletions(-)
diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
index 2c800d7..e535b24 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
@@ -22,8 +22,10 @@ import java.util.Set;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -127,6 +129,43 @@ public class GridCommandHandlerPropertiesTest extends GridCommandHandlerClusterB
}
/**
+ * Checks the set command for property 'checkpoint.deviation'.
+ */
+ @Test
+ public void testPropertyCheckpointDeviation() {
+ for (Ignite ign : G.allGrids()) {
+ if (ign.configuration().isClientMode())
+ continue;
+
+ SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ign).context().cache().context().database(),
+ "cpFreqDeviation");
+
+ assertNull(cpFreqDeviation.get());
+ }
+
+ assertEquals(
+ EXIT_CODE_OK,
+ execute(
+ "--property", "set",
+ "--name", "checkpoint.deviation",
+ "--val", "20"
+ )
+ );
+
+ for (Ignite ign : G.allGrids()) {
+ if (ign.configuration().isClientMode())
+ continue;
+
+ SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ign).context().cache().context().database(),
+ "cpFreqDeviation");
+
+ assertNotNull(cpFreqDeviation.get());
+
+ assertEquals(20, cpFreqDeviation.get().intValue());
+ }
+ }
+
+ /**
* Check the set command fro property 'sql.defaultQueryTimeout'.
* Steps:
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 83420eb..d61a849 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -138,6 +138,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -274,6 +275,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
+ /** Default checkpoint deviation from the configured frequency in percentage. */
+ private static final int DEFAULT_CHECKPOINT_DEVIATION = 40;
+
/** */
private FilePageStoreManager storeMgr;
@@ -346,6 +350,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
/** Data regions which should be checkpointed. */
protected final Set<DataRegion> checkpointedDataRegions = new GridConcurrentHashSet<>();
+ /** Checkpoint frequency deviation. */
+ private SimpleDistributedProperty<Integer> cpFreqDeviation;
+
/**
* @param ctx Kernal context.
*/
@@ -517,6 +524,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
if (!kernalCtx.clientNode()) {
kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
+ cpFreqDeviation = new SimpleDistributedProperty<>("checkpoint.deviation", Integer::parseInt);
+
+ kernalCtx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
+ cpFreqDeviation.addListener((name, oldVal, newVal) ->
+ U.log(log, "Checkpoint frequency deviation changed [oldVal=" + oldVal + ", newVal=" + newVal + "]"));
+
+ dispatcher.registerProperty(cpFreqDeviation);
+ });
+
checkpointManager = new CheckpointManager(
kernalCtx::log,
cctx.igniteInstanceName(),
@@ -534,7 +550,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
persistentStoreMetricsImpl(),
kernalCtx.longJvmPauseDetector(),
kernalCtx.failure(),
- kernalCtx.cache());
+ kernalCtx.cache(),
+ () -> cpFreqDeviation.getOrDefault(DEFAULT_CHECKPOINT_DEVIATION)
+ );
final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 072d3bf..71abc7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -105,6 +105,7 @@ public class CheckpointManager {
* @param longJvmPauseDetector Long JVM pause detector.
* @param failureProcessor Failure processor.
* @param cacheProcessor Cache processor.
+ * @param cpFreqDeviation Distributed checkpoint frequency deviation.
* @throws IgniteCheckedException if fail.
*/
public CheckpointManager(
@@ -124,7 +125,8 @@ public class CheckpointManager {
DataStorageMetricsImpl persStoreMetrics,
LongJVMPauseDetector longJvmPauseDetector,
FailureProcessor failureProcessor,
- GridCacheProcessor cacheProcessor
+ GridCacheProcessor cacheProcessor,
+ Supplier<Integer> cpFreqDeviation
) throws IgniteCheckedException {
CheckpointHistory cpHistory = new CheckpointHistory(
persistenceCfg,
@@ -189,7 +191,8 @@ public class CheckpointManager {
checkpointWorkflow,
checkpointPagesWriterFactory,
persistenceCfg.getCheckpointFrequency(),
- persistenceCfg.getCheckpointThreads()
+ persistenceCfg.getCheckpointThreads(),
+ cpFreqDeviation
);
checkpointer = checkpointerProvider.get();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index aa5e7fc..d18a0ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -23,10 +23,12 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
@@ -154,6 +156,9 @@ public class Checkpointer extends GridWorker {
/** The number of IO-bound threads which will write pages to disk. */
private final int checkpointWritePageThreads;
+ /** Checkpoint frequency deviation. */
+ private final Supplier<Integer> cpFreqDeviation;
+
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private volatile IgniteThreadPoolExecutor checkpointWritePagesPool;
@@ -189,6 +194,7 @@ public class Checkpointer extends GridWorker {
* @param factory Page writer factory.
* @param checkpointFrequency Checkpoint frequency.
* @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
+ * @param cpFreqDeviation Deviation of checkpoint frequency.
*/
Checkpointer(
@Nullable String gridName,
@@ -203,7 +209,8 @@ public class Checkpointer extends GridWorker {
CheckpointWorkflow checkpoint,
CheckpointPagesWriterFactory factory,
long checkpointFrequency,
- int checkpointWritePageThreads
+ int checkpointWritePageThreads,
+ Supplier<Integer> cpFreqDeviation
) {
super(gridName, name, logger.apply(Checkpointer.class), workersRegistry);
this.pauseDetector = detector;
@@ -216,8 +223,9 @@ public class Checkpointer extends GridWorker {
this.cacheProcessor = cacheProcessor;
this.checkpointWritePageThreads = Math.max(checkpointWritePageThreads, 1);
this.checkpointWritePagesPool = initializeCheckpointPool();
+ this.cpFreqDeviation = cpFreqDeviation;
- scheduledCp = new CheckpointProgressImpl(checkpointFreq);
+ scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
}
/**
@@ -264,7 +272,7 @@ public class Checkpointer extends GridWorker {
doCheckpoint();
else {
synchronized (this) {
- scheduledCp.nextCpNanos(System.nanoTime() + U.millisToNanos(checkpointFreq));
+ scheduledCp.nextCpNanos(System.nanoTime() + U.millisToNanos(nextCheckpointInterval()));
}
}
}
@@ -301,6 +309,24 @@ public class Checkpointer extends GridWorker {
}
/**
+ * Gets a checkpoint interval with a randomized delay.
+ * It helps when the cluster makes a checkpoint in the same time in every node.
+ *
+ * @return Next checkpoint interval.
+ */
+ private long nextCheckpointInterval() {
+ Integer deviation = cpFreqDeviation.get();
+
+ if (deviation == null || deviation == 0)
+ return checkpointFreq;
+
+ long startDelay = ThreadLocalRandom.current().nextLong(U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 100, 1))
+ - U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 200, 1);
+
+ return U.safeAbs(checkpointFreq + startDelay);
+ }
+
+ /**
* Change the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do
* nothing otherwise.
*
@@ -814,7 +840,7 @@ public class Checkpointer extends GridWorker {
curr.reason("timeout");
// It is important that we assign a new progress object before checkpoint mark in page memory.
- scheduledCp = new CheckpointProgressImpl(checkpointFreq);
+ scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
curCpProgress = curr;
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 73bec40..653f53f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -157,7 +157,8 @@ public class LightweightCheckpointManager {
checkpointWorkflow,
checkpointPagesWriterFactory,
persistenceCfg.getCheckpointFrequency(),
- persistenceCfg.getCheckpointThreads()
+ persistenceCfg.getCheckpointThreads(),
+ () -> 0
);
checkpointer = checkpointerProvider.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
index c452a4e..eac6bfa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.persistence.db.file;
+import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import com.google.common.base.Strings;
import org.apache.ignite.IgniteCache;
@@ -25,6 +26,9 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.NotNull;
import org.junit.Test;
@@ -38,6 +42,9 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
/** Checkpoint threads. */
public int cpThreads = DFLT_CHECKPOINT_THREADS;
+ /** Checkpoint period. */
+ public long cpFrequency = TimeUnit.SECONDS.toMillis(10);
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
@@ -45,7 +52,7 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
.setPageSize(4 * 1024)
.setDefaultDataRegionConfiguration(new DataRegionConfiguration()
.setPersistenceEnabled(true))
- .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(10)));
+ .setCheckpointFrequency(cpFrequency));
if (cpThreads != DFLT_CHECKPOINT_THREADS) {
cfg.getDataStorageConfiguration()
@@ -88,6 +95,56 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
}
/**
+ * Checks that all checkpoints in the different node spaced apart of enough amount of time.
+ *
+ * @throws Exception If failed.
+ */
+ @Test
+ public void testStartCheckpointDelay() throws Exception {
+ cpFrequency = TimeUnit.SECONDS.toMillis(2);
+
+ int nodes = 4;
+
+ IgniteEx ignite0 = startGrids(nodes);
+
+ ignite0.cluster().state(ClusterState.ACTIVE);
+
+ SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ignite0).context().cache().context().database(),
+ "cpFreqDeviation");
+
+ cpFreqDeviation.propagate(25);
+
+ doSleep(cpFrequency * 2);
+
+ TreeSet<Long> cpStartTimes = new TreeSet<>();
+
+ for (int i = 0; i < nodes; i++)
+ cpStartTimes.add(getLatCheckpointStartTime(ignite(i)));
+
+ assertEquals(nodes, cpStartTimes.size());
+
+ Long prev = 0L;
+
+ for (Long st : cpStartTimes) {
+ assertTrue("There was nothing checkpoint in this node yet.", st != 0);
+
+ assertTrue("Checkpoint started so close on different nodes [one=" + prev + ", other=" + st + ']',
+ st - prev > 200);
+ }
+ }
+
+ /**
+ * Gets a timestamp where latest checkpoint occurred.
+ *
+ * @param ignite Ignite.
+ * @return Latest checkpoint start time.
+ */
+ private Long getLatCheckpointStartTime(IgniteEx ignite) {
+ return U.field(((GridCacheDatabaseSharedManager)ignite.context().cache().context().database()).getCheckpointer(),
+ "lastCpTs");
+ }
+
+ /**
* Checks that all checkpoint threads are present in JVM.
*
* @throws Exception If failed.