You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ib...@apache.org on 2021/02/26 07:52:41 UTC

[ignite] branch master updated: IGNITE-14185 ynchronous checkpoints on several nodes greatly increase a latency of distributed transaction (#8803)

This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e18a970  IGNITE-14185 ynchronous checkpoints on several nodes greatly increase a latency of distributed transaction (#8803)
e18a970 is described below

commit e18a9704d071c25ba1c4fd44f9ab83f9709ca6ad
Author: Vladislav Pyatkov <vl...@gmail.com>
AuthorDate: Fri Feb 26 10:52:12 2021 +0300

    IGNITE-14185 ynchronous checkpoints on several nodes greatly increase a latency of distributed transaction (#8803)
---
 .../util/GridCommandHandlerPropertiesTest.java     | 39 ++++++++++++++
 .../GridCacheDatabaseSharedManager.java            | 20 +++++++-
 .../persistence/checkpoint/CheckpointManager.java  |  7 ++-
 .../cache/persistence/checkpoint/Checkpointer.java | 34 +++++++++++--
 .../checkpoint/LightweightCheckpointManager.java   |  3 +-
 .../db/file/IgnitePdsCheckpointSimpleTest.java     | 59 +++++++++++++++++++++-
 6 files changed, 153 insertions(+), 9 deletions(-)

diff --git a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
index 2c800d7..e535b24 100644
--- a/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
+++ b/modules/control-utility/src/test/java/org/apache/ignite/util/GridCommandHandlerPropertiesTest.java
@@ -22,8 +22,10 @@ import java.util.Set;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
 import org.apache.ignite.internal.processors.query.h2.IgniteH2Indexing;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -127,6 +129,43 @@ public class GridCommandHandlerPropertiesTest extends GridCommandHandlerClusterB
     }
 
     /**
+     * Checks the set command for property 'checkpoint.deviation'.
+     */
+    @Test
+    public void testPropertyCheckpointDeviation() {
+        for (Ignite ign : G.allGrids()) {
+            if (ign.configuration().isClientMode())
+                continue;
+
+            SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ign).context().cache().context().database(),
+                "cpFreqDeviation");
+
+            assertNull(cpFreqDeviation.get());
+        }
+
+        assertEquals(
+            EXIT_CODE_OK,
+            execute(
+                "--property", "set",
+                "--name", "checkpoint.deviation",
+                "--val", "20"
+            )
+        );
+
+        for (Ignite ign : G.allGrids()) {
+            if (ign.configuration().isClientMode())
+                continue;
+
+            SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ign).context().cache().context().database(),
+                "cpFreqDeviation");
+
+            assertNotNull(cpFreqDeviation.get());
+
+            assertEquals(20, cpFreqDeviation.get().intValue());
+        }
+    }
+
+    /**
      * Check the set command fro property 'sql.defaultQueryTimeout'.
      * Steps:
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index 83420eb..d61a849 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -138,6 +138,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti
 import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
 import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
 import org.apache.ignite.internal.processors.compress.CompressionProcessor;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
 import org.apache.ignite.internal.processors.port.GridPortRecord;
 import org.apache.ignite.internal.processors.query.GridQueryProcessor;
 import org.apache.ignite.internal.util.GridConcurrentHashSet;
@@ -274,6 +275,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
     private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
 
+    /** Default checkpoint deviation from the configured frequency in percentage. */
+    private static final int DEFAULT_CHECKPOINT_DEVIATION = 40;
+
     /** */
     private FilePageStoreManager storeMgr;
 
@@ -346,6 +350,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
     /** Data regions which should be checkpointed. */
     protected final Set<DataRegion> checkpointedDataRegions = new GridConcurrentHashSet<>();
 
+    /** Checkpoint frequency deviation. */
+    private SimpleDistributedProperty<Integer> cpFreqDeviation;
+
     /**
      * @param ctx Kernal context.
      */
@@ -517,6 +524,15 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
         if (!kernalCtx.clientNode()) {
             kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
 
+            cpFreqDeviation = new SimpleDistributedProperty<>("checkpoint.deviation", Integer::parseInt);
+
+            kernalCtx.internalSubscriptionProcessor().registerDistributedConfigurationListener(dispatcher -> {
+                cpFreqDeviation.addListener((name, oldVal, newVal) ->
+                    U.log(log, "Checkpoint frequency deviation changed [oldVal=" + oldVal + ", newVal=" + newVal + "]"));
+
+                dispatcher.registerProperty(cpFreqDeviation);
+            });
+
             checkpointManager = new CheckpointManager(
                 kernalCtx::log,
                 cctx.igniteInstanceName(),
@@ -534,7 +550,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan
                 persistentStoreMetricsImpl(),
                 kernalCtx.longJvmPauseDetector(),
                 kernalCtx.failure(),
-                kernalCtx.cache());
+                kernalCtx.cache(),
+                () -> cpFreqDeviation.getOrDefault(DEFAULT_CHECKPOINT_DEVIATION)
+            );
 
             final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
                 .resolveFolders()
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
index 072d3bf..71abc7f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/CheckpointManager.java
@@ -105,6 +105,7 @@ public class CheckpointManager {
      * @param longJvmPauseDetector Long JVM pause detector.
      * @param failureProcessor Failure processor.
      * @param cacheProcessor Cache processor.
+     * @param cpFreqDeviation Distributed checkpoint frequency deviation.
      * @throws IgniteCheckedException if fail.
      */
     public CheckpointManager(
@@ -124,7 +125,8 @@ public class CheckpointManager {
         DataStorageMetricsImpl persStoreMetrics,
         LongJVMPauseDetector longJvmPauseDetector,
         FailureProcessor failureProcessor,
-        GridCacheProcessor cacheProcessor
+        GridCacheProcessor cacheProcessor,
+        Supplier<Integer> cpFreqDeviation
     ) throws IgniteCheckedException {
         CheckpointHistory cpHistory = new CheckpointHistory(
             persistenceCfg,
@@ -189,7 +191,8 @@ public class CheckpointManager {
             checkpointWorkflow,
             checkpointPagesWriterFactory,
             persistenceCfg.getCheckpointFrequency(),
-            persistenceCfg.getCheckpointThreads()
+            persistenceCfg.getCheckpointThreads(),
+            cpFreqDeviation
         );
 
         checkpointer = checkpointerProvider.get();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
index aa5e7fc..d18a0ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/Checkpointer.java
@@ -23,10 +23,12 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.LongAdder;
 import java.util.function.BooleanSupplier;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
@@ -154,6 +156,9 @@ public class Checkpointer extends GridWorker {
     /** The number of IO-bound threads which will write pages to disk. */
     private final int checkpointWritePageThreads;
 
+    /** Checkpoint frequency deviation. */
+    private final Supplier<Integer> cpFreqDeviation;
+
     /** Checkpoint runner thread pool. If null tasks are to be run in single thread */
     @Nullable private volatile IgniteThreadPoolExecutor checkpointWritePagesPool;
 
@@ -189,6 +194,7 @@ public class Checkpointer extends GridWorker {
      * @param factory Page writer factory.
      * @param checkpointFrequency Checkpoint frequency.
      * @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
+     * @param cpFreqDeviation Deviation of checkpoint frequency.
      */
     Checkpointer(
         @Nullable String gridName,
@@ -203,7 +209,8 @@ public class Checkpointer extends GridWorker {
         CheckpointWorkflow checkpoint,
         CheckpointPagesWriterFactory factory,
         long checkpointFrequency,
-        int checkpointWritePageThreads
+        int checkpointWritePageThreads,
+        Supplier<Integer> cpFreqDeviation
     ) {
         super(gridName, name, logger.apply(Checkpointer.class), workersRegistry);
         this.pauseDetector = detector;
@@ -216,8 +223,9 @@ public class Checkpointer extends GridWorker {
         this.cacheProcessor = cacheProcessor;
         this.checkpointWritePageThreads = Math.max(checkpointWritePageThreads, 1);
         this.checkpointWritePagesPool = initializeCheckpointPool();
+        this.cpFreqDeviation = cpFreqDeviation;
 
-        scheduledCp = new CheckpointProgressImpl(checkpointFreq);
+        scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
     }
 
     /**
@@ -264,7 +272,7 @@ public class Checkpointer extends GridWorker {
                     doCheckpoint();
                 else {
                     synchronized (this) {
-                        scheduledCp.nextCpNanos(System.nanoTime() + U.millisToNanos(checkpointFreq));
+                        scheduledCp.nextCpNanos(System.nanoTime() + U.millisToNanos(nextCheckpointInterval()));
                     }
                 }
             }
@@ -301,6 +309,24 @@ public class Checkpointer extends GridWorker {
     }
 
     /**
+     * Gets a checkpoint interval with a randomized delay.
+     * It helps when the cluster makes a checkpoint in the same time in every node.
+     *
+     * @return Next checkpoint interval.
+     */
+    private long nextCheckpointInterval() {
+        Integer deviation = cpFreqDeviation.get();
+
+        if (deviation == null || deviation == 0)
+            return checkpointFreq;
+
+        long startDelay = ThreadLocalRandom.current().nextLong(U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 100, 1))
+            - U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 200, 1);
+
+        return U.safeAbs(checkpointFreq + startDelay);
+    }
+
+    /**
      * Change the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do
      * nothing otherwise.
      *
@@ -814,7 +840,7 @@ public class Checkpointer extends GridWorker {
                 curr.reason("timeout");
 
             // It is important that we assign a new progress object before checkpoint mark in page memory.
-            scheduledCp = new CheckpointProgressImpl(checkpointFreq);
+            scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
 
             curCpProgress = curr;
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
index 73bec40..653f53f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/checkpoint/LightweightCheckpointManager.java
@@ -157,7 +157,8 @@ public class LightweightCheckpointManager {
             checkpointWorkflow,
             checkpointPagesWriterFactory,
             persistenceCfg.getCheckpointFrequency(),
-            persistenceCfg.getCheckpointThreads()
+            persistenceCfg.getCheckpointThreads(),
+            () -> 0
         );
 
         checkpointer = checkpointerProvider.get();
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
index c452a4e..eac6bfa 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimpleTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.persistence.db.file;
 
+import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 import com.google.common.base.Strings;
 import org.apache.ignite.IgniteCache;
@@ -25,6 +26,9 @@ import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.configuration.distributed.SimpleDistributedProperty;
+import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.jetbrains.annotations.NotNull;
 import org.junit.Test;
@@ -38,6 +42,9 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
     /** Checkpoint threads. */
     public int cpThreads = DFLT_CHECKPOINT_THREADS;
 
+    /** Checkpoint period. */
+    public long cpFrequency = TimeUnit.SECONDS.toMillis(10);
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
@@ -45,7 +52,7 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
                 .setPageSize(4 * 1024)
                 .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
                     .setPersistenceEnabled(true))
-                .setCheckpointFrequency(TimeUnit.SECONDS.toMillis(10)));
+                .setCheckpointFrequency(cpFrequency));
 
         if (cpThreads != DFLT_CHECKPOINT_THREADS) {
             cfg.getDataStorageConfiguration()
@@ -88,6 +95,56 @@ public class IgnitePdsCheckpointSimpleTest extends GridCommonAbstractTest {
     }
 
     /**
+     * Checks that all checkpoints in the different node spaced apart of enough amount of time.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testStartCheckpointDelay() throws Exception {
+        cpFrequency = TimeUnit.SECONDS.toMillis(2);
+
+        int nodes = 4;
+
+        IgniteEx ignite0 = startGrids(nodes);
+
+        ignite0.cluster().state(ClusterState.ACTIVE);
+
+        SimpleDistributedProperty<Integer> cpFreqDeviation = U.field(((IgniteEx)ignite0).context().cache().context().database(),
+            "cpFreqDeviation");
+
+        cpFreqDeviation.propagate(25);
+
+        doSleep(cpFrequency * 2);
+
+        TreeSet<Long> cpStartTimes = new TreeSet<>();
+
+        for (int i = 0; i < nodes; i++)
+            cpStartTimes.add(getLatCheckpointStartTime(ignite(i)));
+
+        assertEquals(nodes, cpStartTimes.size());
+
+        Long prev = 0L;
+
+        for (Long st : cpStartTimes) {
+            assertTrue("There was nothing checkpoint in this node yet.", st != 0);
+
+            assertTrue("Checkpoint started so close on different nodes [one=" + prev + ", other=" + st + ']',
+                st - prev > 200);
+        }
+    }
+
+    /**
+     * Gets a timestamp where latest checkpoint occurred.
+     *
+     * @param ignite Ignite.
+     * @return Latest checkpoint start time.
+     */
+    private Long getLatCheckpointStartTime(IgniteEx ignite) {
+        return U.field(((GridCacheDatabaseSharedManager)ignite.context().cache().context().database()).getCheckpointer(),
+            "lastCpTs");
+    }
+
+    /**
      * Checks that all checkpoint threads are present in JVM.
      *
      * @throws Exception If failed.