You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/01/25 15:31:57 UTC
[ignite] branch master updated: IGNITE-14350 Added distributed property to disable CDC. (#10444)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 2b389fef3c9 IGNITE-14350 Added distributed property to disable CDC. (#10444)
2b389fef3c9 is described below
commit 2b389fef3c92797cc40ffb4d3c433fd74e166849
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Wed Jan 25 18:31:47 2023 +0300
IGNITE-14350 Added distributed property to disable CDC. (#10444)
---
docs/_docs/persistence/change-data-capture.adoc | 11 +++++++
.../persistence/wal/FileWriteAheadLogManager.java | 34 +++++++++++++++++++--
.../java/org/apache/ignite/cdc/CdcSelfTest.java | 35 ++++++++++++++++++++++
3 files changed, 78 insertions(+), 2 deletions(-)
diff --git a/docs/_docs/persistence/change-data-capture.adoc b/docs/_docs/persistence/change-data-capture.adoc
index eff02b79986..adbfe1e4706 100644
--- a/docs/_docs/persistence/change-data-capture.adoc
+++ b/docs/_docs/persistence/change-data-capture.adoc
@@ -73,6 +73,17 @@ CDC is configured in the same way as the Ignite node - via the spring XML file:
| `metricExporterSpi` | Array of SPI's to export CDC metrics. See link:monitoring-metrics/new-metrics-system#_metric_exporters[metrics] documentation, also. | null
|===
+=== Distributed properties
+
+The distributed properties listed in the table below allow you to configure CDC at runtime:
+
+[cols="1,3,1",opts="header"]
+|===
+|Property | Description | Default Value
+|`cdc.disabled`| Disables CDC in the cluster to avoid disk overflow. Note that cache changes will be lost when CDC
+is disabled. Useful if the CDC application is down for a long time. | false
+|===
+
== API
=== `org.apache.ignite.cdc.CdcEvent`
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
index a30511728dc..0dc36ad6174 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java
@@ -107,6 +107,7 @@ import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.Re
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
@@ -130,6 +131,7 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
+import static java.lang.String.format;
import static java.nio.file.StandardCopyOption.ATOMIC_MOVE;
import static java.nio.file.StandardCopyOption.REPLACE_EXISTING;
import static java.nio.file.StandardOpenOption.CREATE;
@@ -159,6 +161,7 @@ import static org.apache.ignite.internal.processors.cache.persistence.wal.serial
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds;
import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel;
+import static org.apache.ignite.internal.processors.configuration.distributed.DistributedBooleanProperty.detachedBooleanProperty;
import static org.apache.ignite.internal.util.io.GridFileUtils.ensureHardLinkAvailable;
/**
@@ -220,6 +223,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** @see IgniteSystemProperties#IGNITE_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT */
public static final long DFLT_THRESHOLD_WAIT_TIME_NEXT_WAL_SEGMENT = 1000L;
+ /** CDC disabled distributed property name. */
+ public static final String CDC_DISABLED = "cdc.disabled";
+
/** Use mapped byte buffer. */
private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, DFLT_WAL_MMAP);
@@ -409,6 +415,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
/** Pointer to the last successful checkpoint until which WAL segments can be safely deleted. */
private volatile WALPointer lastCheckpointPtr = new WALPointer(0, 0, 0);
+ /** CDC disabled flag. */
+ private final DistributedBooleanProperty cdcDisabled = detachedBooleanProperty(CDC_DISABLED);
+
/**
* Constructor.
*
@@ -495,6 +504,21 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
);
ensureHardLinkAvailable(walArchiveDir.toPath(), walCdcDir.toPath());
+
+ cctx.kernalContext().internalSubscriptionProcessor()
+ .registerDistributedConfigurationListener(dispatcher -> {
+ cdcDisabled.addListener((name, oldVal, newVal) -> {
+ if (log.isInfoEnabled()) {
+ log.info(format("Distributed property '%s' was changed from '%s' to '%s'.",
+ name, oldVal, newVal));
+ }
+
+ if (newVal != null && newVal)
+ log.warning("CDC was disabled.");
+ });
+
+ dispatcher.registerProperty(cdcDisabled);
+ });
}
serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
@@ -2115,8 +2139,14 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl
Files.move(dstTmpFile.toPath(), dstFile.toPath());
- if (walCdcDir != null)
- Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
+ if (walCdcDir != null) {
+ if (!cdcDisabled.getOrDefault(false))
+ Files.createLink(walCdcDir.toPath().resolve(dstFile.getName()), dstFile.toPath());
+ else {
+ log.warning("Creation of segment CDC link skipped. " +
+ "'" + CDC_DISABLED + "' distributed property is 'true'.");
+ }
+ }
if (mode != WALMode.NONE) {
try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
diff --git a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
index eda1082d0d9..369a7663aa1 100644
--- a/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/cdc/CdcSelfTest.java
@@ -49,8 +49,10 @@ import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
+import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.reader.IgniteWalIteratorFactory.IteratorParametersBuilder;
+import org.apache.ignite.internal.processors.configuration.distributed.DistributedChangeableProperty;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
@@ -691,6 +693,39 @@ public class CdcSelfTest extends AbstractCdcTest {
assertTrue(cnsmr.stopped());
}
+ /** */
+ @Test
+ public void testDisable() throws Exception {
+ IgniteEx ign = startGrid(0);
+
+ ign.cluster().state(ACTIVE);
+
+ IgniteCache<Integer, User> cache = ign.getOrCreateCache(DEFAULT_CACHE_NAME);
+
+ addData(cache, 0, 1);
+
+ File walCdcDir = U.field(ign.context().cache().context().wal(true), "walCdcDir");
+
+ assertTrue(waitForCondition(() -> 1 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT));
+
+ DistributedChangeableProperty<Serializable> disabled = ign.context().distributedConfiguration()
+ .property(FileWriteAheadLogManager.CDC_DISABLED);
+
+ disabled.propagate(true);
+
+ addData(cache, 0, 1);
+
+ Thread.sleep(2 * WAL_ARCHIVE_TIMEOUT);
+
+ assertEquals(1, walCdcDir.list().length);
+
+ disabled.propagate(false);
+
+ addData(cache, 0, 1);
+
+ assertTrue(waitForCondition(() -> 2 == walCdcDir.list().length, 2 * WAL_ARCHIVE_TIMEOUT));
+ }
+
/** */
public static void addData(IgniteCache<Integer, User> cache, int from, int to) {
for (int i = from; i < to; i++)