You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/05 15:18:30 UTC
[ignite-extensions] branch master updated: IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 3288f2d IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
3288f2d is described below
commit 3288f2d39c3bca056e3a33a922e6adc6d9426da0
Author: Nikolay <ni...@apache.org>
AuthorDate: Sat Mar 5 18:18:23 2022 +0300
IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
---
.../org/apache/ignite/cdc/CdcEventsApplier.java | 2 +-
.../ignite/cdc/IgniteToIgniteCdcStreamer.java | 91 +++++++++++++++++-----
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 10 +--
.../cdc/CdcIgniteToIgniteReplicationTest.java | 6 +-
4 files changed, 82 insertions(+), 27 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 09225fc..1751ed6 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
*/
public abstract class CdcEventsApplier {
/** Maximum batch size. */
- private final int maxBatchSize;
+ protected int maxBatchSize;
/** Caches. */
private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index f53c66a..81cd892 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -32,9 +32,14 @@ import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteExperimental;
import org.apache.ignite.resources.LoggerResource;
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
+
/**
* Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
* Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
@@ -49,6 +54,7 @@ import org.apache.ignite.resources.LoggerResource;
* @see CdcMain
* @see CacheVersionConflictResolverImpl
*/
+@IgniteExperimental
public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
/** */
public static final String EVTS_CNT = "EventsCount";
@@ -63,14 +69,20 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
/** Destination cluster client configuration. */
- private final IgniteConfiguration destIgniteCfg;
+ private IgniteConfiguration destIgniteCfg;
/** Handle only primary entry flag. */
- private final boolean onlyPrimary;
+ private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
/** Destination Ignite cluster client */
private IgniteEx dest;
+ /** Cache names. */
+ private Set<String> caches;
+
+ /** Cache IDs. */
+ private Set<Integer> cachesIds;
+
/** Timestamp of last sent message. */
private AtomicLongMetric lastEvtTs;
@@ -81,31 +93,23 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
@LoggerResource
private IgniteLogger log;
- /** Cache IDs. */
- private final Set<Integer> cachesIds;
+ /** */
+ public IgniteToIgniteCdcStreamer() {
+ super(DFLT_MAX_BATCH_SIZE);
+ }
- /**
- * @param destIgniteCfg Configuration of the destination Ignite node.
- * @param onlyPrimary Only primary flag.
- * @param caches Cache names.
- * @param maxBatchSize Maximum batch size.
- */
- public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean onlyPrimary, Set<String> caches, int maxBatchSize) {
- super(maxBatchSize);
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ A.notNull(destIgniteCfg, "Destination ignite configuration");
+ A.notEmpty(caches, "caches");
- this.destIgniteCfg = destIgniteCfg;
- this.onlyPrimary = onlyPrimary;
+ if (log.isInfoEnabled())
+ log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
cachesIds = caches.stream()
.mapToInt(CU::cacheId)
.boxed()
.collect(Collectors.toSet());
- }
-
- /** {@inheritDoc} */
- @Override public void start(MetricRegistry mreg) {
- if (log.isInfoEnabled())
- log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
dest = (IgniteEx)Ignition.start(destIgniteCfg);
@@ -153,4 +157,51 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
@Override protected IgniteLogger log() {
return log;
}
+
+ /**
+ * Sets Ignite client node configuration that will connect to destination cluster.
+ * @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteCdcStreamer setDestinationIgniteConfiguration(IgniteConfiguration destIgniteCfg) {
+ this.destIgniteCfg = destIgniteCfg;
+
+ return this;
+ }
+
+ /**
+ * Sets whether entries only from primary nodes should be handled.
+ *
+ * @param onlyPrimary Whether entries only from primary nodes should be handled.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+ this.onlyPrimary = onlyPrimary;
+
+ return this;
+ }
+
+ /**
+ * Sets cache names that participate in CDC.
+ *
+ * @param caches Cache names.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteCdcStreamer setCaches(Set<String> caches) {
+ this.caches = caches;
+
+ return this;
+ }
+
+ /**
+ * Sets maximum batch size that will be applied to destination cluster.
+ *
+ * @param maxBatchSize Maximum batch size.
+ * @return {@code this} for chaining.
+ */
+ public IgniteToIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+
+ return this;
+ }
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index c1cbbeb..be80532 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -105,7 +105,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
private Set<Integer> cachesIds;
/** Cache names. */
- private Collection<String> cacheNames;
+ private Collection<String> caches;
/** Max batch size. */
private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
@@ -194,14 +194,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
@Override public void start(MetricRegistry mreg) {
A.notNull(kafkaProps, "Kafka properties");
A.notNull(topic, "Kafka topic");
- A.notEmpty(cacheNames, "caches");
+ A.notEmpty(caches, "caches");
A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");
kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
- cachesIds = cacheNames.stream()
+ cachesIds = caches.stream()
.map(CU::cacheId)
.collect(Collectors.toSet());
@@ -228,7 +228,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/**
* Sets whether entries only from primary nodes should be handled.
*
- * @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
+ * @param onlyPrimary Whether entries only from primary nodes should be handled.
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
@@ -268,7 +268,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
* @return {@code this} for chaining.
*/
public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
- this.cacheNames = caches;
+ this.caches = caches;
return this;
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 5a9b5b8..29f4253 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -69,7 +69,11 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
return runAsync(() -> {
CdcConfiguration cdcCfg = new CdcConfiguration();
- cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+ cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
+ .setMaxBatchSize(KEYS_CNT)
+ .setDestinationIgniteConfiguration(destCfg)
+ .setCaches(Collections.singleton(cache)));
+
cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);