You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/03/05 15:18:30 UTC

[ignite-extensions] branch master updated: IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 3288f2d  IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
3288f2d is described below

commit 3288f2d39c3bca056e3a33a922e6adc6d9426da0
Author: Nikolay <ni...@apache.org>
AuthorDate: Sat Mar 5 18:18:23 2022 +0300

    IGNITE-16664 IgniteToIgniteCdcStreamer named parameters added (#100)
---
 .../org/apache/ignite/cdc/CdcEventsApplier.java    |  2 +-
 .../ignite/cdc/IgniteToIgniteCdcStreamer.java      | 91 +++++++++++++++++-----
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 10 +--
 .../cdc/CdcIgniteToIgniteReplicationTest.java      |  6 +-
 4 files changed, 82 insertions(+), 27 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 09225fc..1751ed6 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -40,7 +40,7 @@ import org.apache.ignite.internal.util.typedef.internal.CU;
  */
 public abstract class CdcEventsApplier {
     /** Maximum batch size. */
-    private final int maxBatchSize;
+    protected int maxBatchSize;
 
     /** Caches. */
     private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index f53c66a..81cd892 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -32,9 +32,14 @@ import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.ignite.resources.LoggerResource;
 
+import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.DFLT_IS_ONLY_PRIMARY;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_MAX_BATCH_SIZE;
+
 /**
  * Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
  * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
@@ -49,6 +54,7 @@ import org.apache.ignite.resources.LoggerResource;
  * @see CdcMain
  * @see CacheVersionConflictResolverImpl
  */
+@IgniteExperimental
 public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
     /** */
     public static final String EVTS_CNT = "EventsCount";
@@ -63,14 +69,20 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
     public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
 
     /** Destination cluster client configuration. */
-    private final IgniteConfiguration destIgniteCfg;
+    private IgniteConfiguration destIgniteCfg;
 
     /** Handle only primary entry flag. */
-    private final boolean onlyPrimary;
+    private boolean onlyPrimary = DFLT_IS_ONLY_PRIMARY;
 
     /** Destination Ignite cluster client */
     private IgniteEx dest;
 
+    /** Cache names. */
+    private Set<String> caches;
+
+    /** Cache IDs. */
+    private Set<Integer> cachesIds;
+
     /** Timestamp of last sent message. */
     private AtomicLongMetric lastEvtTs;
 
@@ -81,31 +93,23 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
     @LoggerResource
     private IgniteLogger log;
 
-    /** Cache IDs. */
-    private final Set<Integer> cachesIds;
+    /** */
+    public IgniteToIgniteCdcStreamer() {
+        super(DFLT_MAX_BATCH_SIZE);
+    }
 
-    /**
-     * @param destIgniteCfg Configuration of the destination Ignite node.
-     * @param onlyPrimary Only primary flag.
-     * @param caches Cache names.
-     * @param maxBatchSize Maximum batch size.
-     */
-    public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean onlyPrimary, Set<String> caches, int maxBatchSize) {
-        super(maxBatchSize);
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry mreg) {
+        A.notNull(destIgniteCfg, "Destination ignite configuration");
+        A.notEmpty(caches, "caches");
 
-        this.destIgniteCfg = destIgniteCfg;
-        this.onlyPrimary = onlyPrimary;
+        if (log.isInfoEnabled())
+            log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
         cachesIds = caches.stream()
             .mapToInt(CU::cacheId)
             .boxed()
             .collect(Collectors.toSet());
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start(MetricRegistry mreg) {
-        if (log.isInfoEnabled())
-            log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
 
@@ -153,4 +157,51 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
     @Override protected IgniteLogger log() {
         return log;
     }
+
+    /**
+     * Sets Ignite client node configuration that will connect to destination cluster.
+     * @param destIgniteCfg Ignite client node configuration that will connect to destination cluster.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteCdcStreamer setDestinationIgniteConfiguration(IgniteConfiguration destIgniteCfg) {
+        this.destIgniteCfg = destIgniteCfg;
+
+        return this;
+    }
+
+    /**
+     * Sets whether entries only from primary nodes should be handled.
+     *
+     * @param onlyPrimary Whether entries only from primary nodes should be handled.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
+        this.onlyPrimary = onlyPrimary;
+
+        return this;
+    }
+
+    /**
+     * Sets cache names that participate in CDC.
+     *
+     * @param caches Cache names.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteCdcStreamer setCaches(Set<String> caches) {
+        this.caches = caches;
+
+        return this;
+    }
+
+    /**
+     * Sets maximum batch size that will be applied to destination cluster.
+     *
+     * @param maxBatchSize Maximum batch size.
+     * @return {@code this} for chaining.
+     */
+    public IgniteToIgniteCdcStreamer setMaxBatchSize(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+
+        return this;
+    }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index c1cbbeb..be80532 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -105,7 +105,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     private Set<Integer> cachesIds;
 
     /** Cache names. */
-    private Collection<String> cacheNames;
+    private Collection<String> caches;
 
     /** Max batch size. */
     private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
@@ -194,14 +194,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     @Override public void start(MetricRegistry mreg) {
         A.notNull(kafkaProps, "Kafka properties");
         A.notNull(topic, "Kafka topic");
-        A.notEmpty(cacheNames, "caches");
+        A.notEmpty(caches, "caches");
         A.ensure(kafkaParts > 0, "The number of Kafka partitions must be explicitly set to a value greater than zero.");
         A.ensure(kafkaReqTimeout >= 0, "The Kafka request timeout cannot be negative.");
 
         kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
         kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
 
-        cachesIds = cacheNames.stream()
+        cachesIds = caches.stream()
             .map(CU::cacheId)
             .collect(Collectors.toSet());
 
@@ -228,7 +228,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     /**
      * Sets whether entries only from primary nodes should be handled.
      *
-     * @param onlyPrimary Kafka whether entries only from primary nodes should be handled.
+     * @param onlyPrimary Whether entries only from primary nodes should be handled.
      * @return {@code this} for chaining.
      */
     public IgniteToKafkaCdcStreamer setOnlyPrimary(boolean onlyPrimary) {
@@ -268,7 +268,7 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
      * @return {@code this} for chaining.
      */
     public IgniteToKafkaCdcStreamer setCaches(Collection<String> caches) {
-        this.cacheNames = caches;
+        this.caches = caches;
 
         return this;
     }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 5a9b5b8..29f4253 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -69,7 +69,11 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
         return runAsync(() -> {
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
-            cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+            cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
+                .setMaxBatchSize(KEYS_CNT)
+                .setDestinationIgniteConfiguration(destCfg)
+                .setCaches(Collections.singleton(cache)));
+
             cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
 
             CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);