You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/01/25 15:31:57 UTC

[ignite] branch master updated: IGNITE-14350 Added distributed property to disable CDC. (#10444)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b389fef3c9 IGNITE-14350 Added distributed property to disable CDC. (#10444)
2b389fef3c9 is described below

commit 2b389fef3c92797cc40ffb4d3c433fd74e166849
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Wed Jan 25 18:31:47 2023 +0300

    IGNITE-14350 Added distributed property to disable CDC. (#10444)
---
 docs/_docs/persistence/change-data-capture.adoc    | 11 +++++++
 .../persistence/wal/FileWriteAheadLogManager.java  | 34 +++++++++++++++++++--
 .../java/org/apache/ignite/cdc/CdcSelfTest.java    | 35 ++++++++++++++++++++++
 3 files changed, 78 insertions(+), 2 deletions(-)

diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc
index eff02b79986..adbfe1e4706 100644
--- a/docs/_docs/persistence/change-data-capture.adoc
+++ b/docs/_docs/persistence/change-data-capture.adoc
@@ -73,6 +73,17 @@ CDC is configured in the same way as the Ignite node - via the spring XML file:
 | `metricExporterSpi` | Array of SPI's to export CDC metrics. See link:monitoring-metrics/new-metrics-system#_metric_exporters[metrics] documentation, also. | null
 |===
 
+=== Distributed properties
+
+The distributed properties listed in the table below allow you to configure CDC at runtime:
+
+[cols="1,3,1",opts="header"]
+|===
+|Property | Description | Default Value
+|`cdc.disabled`| Disables CDC in the cluster to avoid disk overflow. Note that cache changes will be lost when CDC
+is disabled. Useful if the CDC application is down for a long time. | false
+|===
+
 == API
 
 === `org.apache.ignite.cdc.CdcEvent`
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a30511728dc..0dc36ad6174 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -107,6 +107,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
 import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -130,6 +131,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
 
+import static java.lang.String.format;
 import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
 import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
 import static java.nio.file.StandardOpenOption.CREATE;
@@ -159,6 +161,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial
 import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
 import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
 import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty;
 import static org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
 
 /**
@@ -220,6 +223,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** @see IgniteSystemProperties#IGNITE_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT */
     public static final long DFLT_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT = 1000L;
 
+    /** CDC disabled distributed property name. */
+    public static final String CDC_DISABLED = "cdc.disabled";
+
     /** Use mapped byte buffer. */
     private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP);
 
@@ -409,6 +415,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
     /** Pointer to the last successful checkpoint until which WAL segments can be safely deleted. */
     private volatile WALPointer lastCheckpointPtr = new WALPointer(0, 0, 0);
 
+    /** CDC disabled flag. */
+    private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED);
+
     /**
      * Constructor.
      *
@@ -495,6 +504,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
                 );
 
                 ensureHardLinkAvailable(walArchiveDir.toPath(), walCdcDir.toPath());
+
+                cctx.kernalContext().internalSubscriptionProcessor()
+                    .registerDistributedConfigurationListener(dispatcher -> {
+                        cdcDisabled.addListener((name, oldVal, newVal) -> {
+                            if (log.isInfoEnabled()) {
+                                log.info(format("Distributed property '%s' was changed from '%s' to '%s'.",
+                                    name, oldVal, newVal));
+                            }
+
+                            if (newVal != null && newVal)
+                                log.warning("CDC was disabled.");
+                        });
+
+                        dispatcher.registerProperty(cdcDisabled);
+                    });
             }
 
             serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
@@ -2115,8 +2139,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
 
                 Files.move(dstTmpFile.toPath(), dstFile.toPath());
 
-                if (walCdcDir != null)
-                    Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
+                if (walCdcDir != null) {
+                    if (!cdcDisabled.getOrDefault(false))
+                        Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
+                    else {
+                        log.warning("Creation of segment CDC link skipped. " +
+                            "'" + CDC_DISABLED + "' distributed property is 'true'.");
+                    }
+                }
 
                 if (mode != WALMode.NONE) {
                     try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index eda1082d0d9..369a7663aa1 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -49,8 +49,10 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.pagemem.wal.WALIterator;
 import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
 import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
 import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
@@ -691,6 +693,39 @@ public class CdcSelfTest extends AbstractCdcTest {
         assertTrue(cnsmr.stopped());
     }
 
+    /** */
+    @Test
+    public void testDisable() throws Exception {
+        IgniteEx ign = startGrid(0);
+
+        ign.cluster().state(ACTIVE);
+
+        IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+        addData(cache, 0, 1);
+
+        File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir");
+
+        assertTrue(waitForCondition(() -> 1 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT));
+
+        DistributedChangeableProperty<Serializable> disabled = ign.context().distributedConfiguration()
+            .property(FileWriteAheadLogManager.CDC_DISABLED);
+
+        disabled.propagate(true);
+
+        addData(cache, 0, 1);
+
+        Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT);
+
+        assertEquals(1, walCdcDir.list().length);
+
+        disabled.propagate(false);
+
+        addData(cache, 0, 1);
+
+        assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT));
+    }
+
     /** */
     public static void addData(IgniteCache<Integer, User> cache, int from, int to) {
         for (int i = from; i < to; i++)