You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/09/13 15:29:21 UTC
[ignite-extensions] branch master updated: IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)
This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 9f533b4 IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)
9f533b4 is described below
commit 9f533b4489ad8a8347c4ceba0669415dcb6cdc56
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Tue Sep 13 18:29:17 2022 +0300
IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)
---
.../ignite/cdc/IgniteToIgniteCdcStreamer.java | 2 +-
....java => AbstractKafkaToIgniteCdcStreamer.java} | 187 ++++++++----------
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 1 +
.../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java | 219 +++------------------
.../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 22 +--
.../cdc/kafka/KafkaToIgniteClientCdcStreamer.java | 130 ++++++++++++
.../cdc/kafka/KafkaToIgniteCommandLineStartup.java | 24 ++-
.../ignite/cdc/kafka/KafkaToIgniteLoader.java | 33 +++-
.../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 14 +-
.../apache/ignite/cdc/AbstractReplicationTest.java | 59 ++++--
.../cdc/CdcIgniteToIgniteReplicationTest.java | 38 +++-
.../org/apache/ignite/cdc/IgniteCdcTestSuite.java | 4 +-
.../cdc/kafka/CdcKafkaReplicationAppsTest.java | 32 ++-
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 16 +-
.../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java | 29 +++
.../CdcIgniteToIgniteClientReplicationTest.java | 108 ----------
.../resources/loader/kafka-to-ignite-correct.xml | 2 +-
.../loader/kafka-to-ignite-double-ignite-cfg.xml | 2 +-
.../kafka-to-ignite-without-kafka-properties.xml | 2 +-
.../kafka-to-ignite-client-correct.xml} | 17 +-
.../kafka-to-ignite-client-double-client-cfg.xml} | 19 +-
.../kafka-to-ignite-client-with-ignite-cfg.xml} | 10 +-
...-to-ignite-client-without-kafka-properties.xml} | 17 +-
.../kafka-to-ignite-client.xml} | 34 ++--
24 files changed, 492 insertions(+), 529 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index 60e1dfa..9b7cc7d 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -58,7 +58,7 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
- A.notNull(destIgniteCfg, "Destination ignite configuration");
+ A.notNull(destIgniteCfg, "Destination Ignite configuration.");
dest = (IgniteEx)Ignition.start(destIgniteCfg);
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
similarity index 63%
copy from modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
copy to modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index b0a5e4b..a8d28e1 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -18,27 +18,23 @@
package org.apache.ignite.cdc.kafka;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
-import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
-import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
-import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridLoggerProxy;
-import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteExperimental;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -51,50 +47,28 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
/**
- * Main class of Kafka to Ignite application.
+ * Main abstract class of Kafka to Ignite application.
* This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
* Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
- * and apply {@link CdcEvent} to Ignite.
- * <p>
+ * and apply {@link CdcEvent} to Ignite. There are two implementations:
+ * <ul>
+ * <li>{@link KafkaToIgniteCdcStreamer} to apply through client node.</li>
+ * <li>{@link KafkaToIgniteClientCdcStreamer} to apply through thin client.</li>
+ * </ul>
* Each applier receive even number of kafka topic partition to read.
- * <p>
- * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
- * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
- * such as Kafka or Ignite unavailability.
- * <p>
- * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
- * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
- * Example of Ignite configuration with the conflict resolver:
- * <pre>
- * {@code
- * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
- *
- * conflictPlugin.setClusterId(clusterId); // Cluster id.
- * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
- *
- * IgniteConfiguration cfg = ...;
- *
- * cfg.setPluginProviders(conflictPlugin);
- * }
- * </pre>
- * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
*
* @see CdcMain
* @see IgniteToKafkaCdcStreamer
* @see CdcEvent
- * @see KafkaToIgniteCdcStreamerApplier
- * @see CacheConflictResolutionManagerImpl
+ * @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteClientCdcStreamer
*/
-@IgniteExperimental
-public class KafkaToIgniteCdcStreamer implements Runnable {
- /** Ignite configuration. */
- private final IgniteConfiguration igniteCfg;
-
+abstract class AbstractKafkaToIgniteCdcStreamer implements Runnable {
/** Kafka consumer properties. */
private final Properties kafkaProps;
/** Streamer configuration. */
- private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
+ protected final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
/** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
private final List<Thread> runners;
@@ -102,16 +76,14 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
/** Appliers. */
private final List<AutoCloseable> appliers;
+ /** */
+ protected IgniteLogger log;
+
/**
- * @param igniteCfg Ignite configuration.
* @param kafkaProps Kafka properties.
* @param streamerCfg Streamer configuration.
*/
- public KafkaToIgniteCdcStreamer(
- IgniteConfiguration igniteCfg,
- Properties kafkaProps,
- KafkaToIgniteCdcStreamerConfiguration streamerCfg
- ) {
+ public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
A.notNull(streamerCfg.getTopic(), "Kafka topic");
A.notNull(streamerCfg.getMetadataTopic(), "Kafka metadata topic");
A.ensure(
@@ -129,7 +101,6 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
"Threads count must be less or equals to the total Kafka partitions count.");
- this.igniteCfg = igniteCfg;
this.kafkaProps = kafkaProps;
this.streamerCfg = streamerCfg;
@@ -146,6 +117,10 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
/** {@inheritDoc} */
@Override public void run() {
try {
+ initLogger();
+
+ ackAsciiLogo(log);
+
runx();
}
catch (Exception e) {
@@ -154,74 +129,65 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
}
/** */
- private void runx() throws Exception {
- U.initWorkDir(igniteCfg);
+ protected void runAppliers() {
+ AtomicBoolean stopped = new AtomicBoolean();
+
+ Set<Integer> caches = null;
- IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+ if (!F.isEmpty(streamerCfg.getCaches())) {
+ checkCaches(streamerCfg.getCaches());
- igniteCfg.setGridLogger(log);
+ caches = streamerCfg.getCaches().stream()
+ .map(CU::cacheId).collect(Collectors.toSet());
+ }
+
+ KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
+ binaryContext(),
+ log,
+ kafkaProps,
+ streamerCfg
+ );
- ackAsciiLogo(log);
+ int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+ int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+ int threadCnt = streamerCfg.getThreadCount();
- try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
- AtomicBoolean stopped = new AtomicBoolean();
+ int partPerApplier = kafkaParts / threadCnt;
- Set<Integer> caches = null;
+ for (int i = 0; i < threadCnt; i++) {
+ int from = i * partPerApplier;
+ int to = (i + 1) * partPerApplier;
- if (!F.isEmpty(streamerCfg.getCaches())) {
- caches = streamerCfg.getCaches().stream()
- .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
- .map(CU::cacheId).collect(Collectors.toSet());
- }
+ if (i == threadCnt - 1)
+ to = kafkaParts;
- KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
- ign,
+ KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
+ () -> eventsApplier(),
log,
kafkaProps,
- streamerCfg
+ streamerCfg.getTopic(),
+ kafkaPartsFrom + from,
+ kafkaPartsFrom + to,
+ caches,
+ streamerCfg.getMaxBatchSize(),
+ streamerCfg.getKafkaRequestTimeout(),
+ metaUpdr,
+ stopped
);
- int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
- int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
- int threadCnt = streamerCfg.getThreadCount();
-
- int partPerApplier = kafkaParts / threadCnt;
-
- for (int i = 0; i < threadCnt; i++) {
- int from = i * partPerApplier;
- int to = (i + 1) * partPerApplier;
-
- if (i == threadCnt - 1)
- to = kafkaParts;
-
- KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
- ign,
- log,
- kafkaProps,
- streamerCfg.getTopic(),
- kafkaPartsFrom + from,
- kafkaPartsFrom + to,
- caches,
- streamerCfg.getMaxBatchSize(),
- streamerCfg.getKafkaRequestTimeout(),
- metaUpdr,
- stopped
- );
-
- addAndStart("applier-thread-" + i, applier);
- }
-
- try {
- for (int i = 0; i < threadCnt + 1; i++)
- runners.get(i).join();
- }
- catch (InterruptedException e) {
- stopped.set(true);
-
- appliers.forEach(U::closeQuiet);
-
- log.warning("Kafka to Ignite streamer interrupted", e);
- }
+ addAndStart("applier-thread-" + i, applier);
+ }
+
+ try {
+ for (int i = 0; i < threadCnt + 1; i++)
+ runners.get(i).join();
+ }
+ catch (InterruptedException e) {
+ stopped.set(true);
+
+ appliers.forEach(U::closeQuiet);
+
+ log.warning("Kafka to Ignite streamer interrupted", e);
}
}
@@ -236,6 +202,21 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
runners.add(thread);
}
+ /** Init logger. */
+ protected abstract void initLogger() throws Exception;
+
+ /** */
+ protected abstract void runx() throws Exception;
+
+ /** @return Binary context. */
+ protected abstract BinaryContext binaryContext();
+
+ /** @return Cdc events applier. */
+ protected abstract AbstractCdcEventsApplier eventsApplier();
+
+ /** Checks that configured caches exist in a destination cluster. */
+ protected abstract void checkCaches(Collection<String> caches);
+
/** */
private void ackAsciiLogo(IgniteLogger log) {
String ver = "ver. " + ACK_VER_STR;
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index f9ec85c..3ab2b93 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -81,6 +81,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_
*
* @see CdcMain
* @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteClientCdcStreamer
* @see CacheVersionConflictResolverImpl
*/
@IgniteExperimental
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index b0a5e4b..3386a4c 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -17,46 +17,29 @@
package org.apache.ignite.cdc.kafka;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
import java.util.Objects;
import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.CdcEventsIgniteApplier;
import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridLoggerProxy;
import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.cdc.CdcMain;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteExperimental;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-
-import static org.apache.ignite.internal.IgniteKernal.NL;
-import static org.apache.ignite.internal.IgniteKernal.SITE;
-import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
-import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
/**
* Main class of Kafka to Ignite application.
* This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
* Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
- * and apply {@link CdcEvent} to Ignite.
- * <p>
- * Each applier receive even number of kafka topic partition to read.
+ * and apply {@link CdcEvent} to Ignite through client node.
* <p>
* In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
* It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
@@ -86,21 +69,12 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
* @see CacheConflictResolutionManagerImpl
*/
@IgniteExperimental
-public class KafkaToIgniteCdcStreamer implements Runnable {
+public class KafkaToIgniteCdcStreamer extends AbstractKafkaToIgniteCdcStreamer {
/** Ignite configuration. */
private final IgniteConfiguration igniteCfg;
- /** Kafka consumer properties. */
- private final Properties kafkaProps;
-
- /** Streamer configuration. */
- private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
-
- /** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
- private final List<Thread> runners;
-
- /** Appliers. */
- private final List<AutoCloseable> appliers;
+ /** Ignite client node. */
+ private IgniteEx ign;
/**
* @param igniteCfg Ignite configuration.
@@ -112,178 +86,45 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
Properties kafkaProps,
KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
- A.notNull(streamerCfg.getTopic(), "Kafka topic");
- A.notNull(streamerCfg.getMetadataTopic(), "Kafka metadata topic");
- A.ensure(
- streamerCfg.getKafkaPartsFrom() >= 0,
- "The Kafka partitions lower bound must be explicitly set to a value greater than or equals to zero.");
- A.ensure(
- streamerCfg.getKafkaPartsTo() > 0,
- "The Kafka partitions upper bound must be explicitly set to a value greater than zero.");
- A.ensure(
- streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
- "The Kafka partitions upper bound must be greater than lower bound.");
- A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
- A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
- A.ensure(
- streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
- "Threads count must be less or equals to the total Kafka partitions count.");
+ super(kafkaProps, streamerCfg);
- this.igniteCfg = igniteCfg;
- this.kafkaProps = kafkaProps;
- this.streamerCfg = streamerCfg;
-
- appliers = new ArrayList<>(streamerCfg.getThreadCount());
- runners = new ArrayList<>(streamerCfg.getThreadCount());
-
- if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
- throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
+ A.notNull(igniteCfg, "Destination Ignite configuration.");
- kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
- kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+ this.igniteCfg = igniteCfg;
}
/** {@inheritDoc} */
- @Override public void run() {
- try {
- runx();
- }
- catch (Exception e) {
- throw new IgniteException(e);
- }
- }
-
- /** */
- private void runx() throws Exception {
+ @Override protected void initLogger() throws Exception {
U.initWorkDir(igniteCfg);
- IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+ log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
igniteCfg.setGridLogger(log);
+ }
- ackAsciiLogo(log);
-
+ /** {@inheritDoc} */
+ @Override protected void runx() throws Exception {
try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
- AtomicBoolean stopped = new AtomicBoolean();
-
- Set<Integer> caches = null;
-
- if (!F.isEmpty(streamerCfg.getCaches())) {
- caches = streamerCfg.getCaches().stream()
- .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
- .map(CU::cacheId).collect(Collectors.toSet());
- }
-
- KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
- ign,
- log,
- kafkaProps,
- streamerCfg
- );
-
- int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
- int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
- int threadCnt = streamerCfg.getThreadCount();
+ this.ign = ign;
- int partPerApplier = kafkaParts / threadCnt;
-
- for (int i = 0; i < threadCnt; i++) {
- int from = i * partPerApplier;
- int to = (i + 1) * partPerApplier;
-
- if (i == threadCnt - 1)
- to = kafkaParts;
-
- KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
- ign,
- log,
- kafkaProps,
- streamerCfg.getTopic(),
- kafkaPartsFrom + from,
- kafkaPartsFrom + to,
- caches,
- streamerCfg.getMaxBatchSize(),
- streamerCfg.getKafkaRequestTimeout(),
- metaUpdr,
- stopped
- );
-
- addAndStart("applier-thread-" + i, applier);
- }
-
- try {
- for (int i = 0; i < threadCnt + 1; i++)
- runners.get(i).join();
- }
- catch (InterruptedException e) {
- stopped.set(true);
-
- appliers.forEach(U::closeQuiet);
-
- log.warning("Kafka to Ignite streamer interrupted", e);
- }
+ runAppliers();
}
}
- /** Adds applier to {@link #appliers} and starts thread with it. */
- private <T extends AutoCloseable & Runnable> void addAndStart(String threadName, T applier) {
- appliers.add(applier);
-
- Thread thread = new Thread(applier, threadName);
-
- thread.start();
+ /** {@inheritDoc} */
+ @Override protected AbstractCdcEventsApplier eventsApplier() {
+ U.setCurrentIgniteName(ign.name());
- runners.add(thread);
+ return new CdcEventsIgniteApplier(ign, streamerCfg.getMaxBatchSize(), log);
}
- /** */
- private void ackAsciiLogo(IgniteLogger log) {
- String ver = "ver. " + ACK_VER_STR;
-
- if (log.isInfoEnabled()) {
- log.info(NL + NL +
- ">>> __ _____ ______ _____ __________ __________ ________________" + NL +
- ">>> / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/" + NL +
- ">>> / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ " + NL +
- ">>> /_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ " + NL +
- ">>> " + NL +
- ">>> " + NL +
- ">>> " + ver + NL +
- ">>> " + COPYRIGHT + NL +
- ">>> " + NL +
- ">>> Ignite documentation: " + "http://" + SITE + NL +
- ">>> Kafka topic: " + streamerCfg.getTopic() + NL +
- ">>> Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo() + NL
- );
- }
-
- if (log.isQuiet()) {
- U.quiet(false,
- " __ _____ ______ _____ __________ __________ ________________",
- " / //_/ _ | / __/ //_/ _ | /_ __/ __ \\ / _/ ___/ |/ / _/_ __/ __/",
- " / ,< / __ |/ _// ,< / __ | / / / /_/ / _/ // (_ / // / / / / _/ ",
- "/_/|_/_/ |_/_/ /_/|_/_/ |_| /_/ \\____/ /___/\\___/_/|_/___/ /_/ /___/ ",
- "",
- ver,
- COPYRIGHT,
- "",
- "Ignite documentation: " + "http://" + SITE,
- "Kafka topic: " + streamerCfg.getTopic(),
- "Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo(),
- "",
- "Quiet mode.");
-
- String fileName = log.fileName();
-
- if (fileName != null)
- U.quiet(false, " ^-- Logging to file '" + fileName + '\'');
-
- if (log instanceof GridLoggerProxy)
- U.quiet(false, " ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
+ /** {@inheritDoc} */
+ @Override protected BinaryContext binaryContext() {
+ return ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
+ }
- U.quiet(false,
- " ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to kafka-to-ignite.{sh|bat}",
- "");
- }
+ /** {@inheritDoc} */
+ @Override protected void checkCaches(Collection<String> caches) {
+ caches.forEach(name -> Objects.requireNonNull(ign.cache(name), name + " not exists!"));
}
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 7b7f97a..ceb688e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -31,20 +31,18 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheEntryVersion;
import org.apache.ignite.cdc.AbstractCdcEventsApplier;
import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.CdcEventsIgniteApplier;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -85,9 +83,6 @@ import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_M
* @see CacheEntryVersion
*/
class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
- /** Ignite instance. */
- private final IgniteEx ign;
-
/** Log. */
private final IgniteLogger log;
@@ -121,11 +116,14 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
/** */
private final AtomicLong rcvdEvts = new AtomicLong();
- /** */
+ /** Cdc events applier supplier. */
+ private final Supplier<AbstractCdcEventsApplier> applierSupplier;
+
+ /** Cdc events applier. */
private AbstractCdcEventsApplier applier;
/**
- * @param ign Ignite instance.
+ * @param applierSupplier Cdc events applier supplier.
* @param log Logger.
* @param kafkaProps Kafka properties.
* @param topic Topic name.
@@ -138,7 +136,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
* @param stopped Stopped flag.
*/
public KafkaToIgniteCdcStreamerApplier(
- IgniteEx ign,
+ Supplier<AbstractCdcEventsApplier> applierSupplier,
IgniteLogger log,
Properties kafkaProps,
String topic,
@@ -150,7 +148,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
KafkaToIgniteMetadataUpdater metaUpdr,
AtomicBoolean stopped
) {
- this.ign = ign;
+ this.applierSupplier = applierSupplier;
this.kafkaProps = kafkaProps;
this.topic = topic;
this.kafkaPartFrom = kafkaPartFrom;
@@ -160,13 +158,11 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
this.metaUpdr = metaUpdr;
this.stopped = stopped;
this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
-
- applier = new CdcEventsIgniteApplier(ign, maxBatchSize, log);
}
/** {@inheritDoc} */
@Override public void run() {
- U.setCurrentIgniteName(ign.name());
+ applier = applierSupplier.get();
try {
for (int kafkaPart = kafkaPartFrom; kafkaPart < kafkaPartTo; kafkaPart++) {
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java
new file mode 100644
index 0000000..6d05aab
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.thin.CdcEventsIgniteClientApplier;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.client.thin.ClientBinary;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Main class of Kafka to Ignite application.
+ * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
+ * and apply {@link CdcEvent} to Ignite through thin client.
+ * <p>
+ * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ * <p>
+ * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
+ * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
+ * Example of Ignite configuration with the conflict resolver:
+ * <pre>
+ * {@code
+ * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
+ *
+ * conflictPlugin.setClusterId(clusterId); // Cluster id.
+ * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
+ *
+ * IgniteConfiguration cfg = ...;
+ *
+ * cfg.setPluginProviders(conflictPlugin);
+ * }
+ * </pre>
+ * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
+ *
+ * @see CdcMain
+ * @see IgniteToKafkaCdcStreamer
+ * @see CdcEvent
+ * @see IgniteClient
+ * @see KafkaToIgniteCdcStreamerApplier
+ * @see CacheConflictResolutionManagerImpl
+ */
+@IgniteExperimental
+public class KafkaToIgniteClientCdcStreamer extends AbstractKafkaToIgniteCdcStreamer {
+ /** Ignite thin client configuration. */
+ private final ClientConfiguration clientCfg;
+
+ /** Ignite thin client. */
+ private IgniteClient client;
+
+ /**
+ * @param clientCfg Ignite thin client configuration.
+ * @param kafkaProps Kafka properties.
+ * @param streamerCfg Streamer configuration.
+ */
+ public KafkaToIgniteClientCdcStreamer(
+ ClientConfiguration clientCfg,
+ Properties kafkaProps,
+ KafkaToIgniteCdcStreamerConfiguration streamerCfg
+ ) {
+ super(kafkaProps, streamerCfg);
+
+ A.notNull(clientCfg, "Destination thin client configuration.");
+
+ this.clientCfg = clientCfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void initLogger() throws Exception {
+ log = U.initLogger(null, "kafka-ignite-streamer", UUID.randomUUID(), U.defaultWorkDirectory());
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void runx() throws Exception {
+ try (IgniteClient client = Ignition.startClient(clientCfg)) {
+ this.client = client;
+
+ runAppliers();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected AbstractCdcEventsApplier eventsApplier() {
+ GridBinaryMarshaller.popContext(binaryContext());
+
+ return new CdcEventsIgniteClientApplier(client, streamerCfg.getMaxBatchSize(), log);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected BinaryContext binaryContext() {
+ return ((ClientBinary)client.binary()).binaryContext();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void checkCaches(Collection<String> caches) {
+ Collection<String> clusterCaches = client.cacheNames();
+
+ caches.forEach(name -> A.ensure(clusterCaches.contains(name), name + " not exists!"));
+ }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
index c425664..f14aad8 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
@@ -27,8 +27,10 @@ import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
/**
- * This class defines command-line {@link KafkaToIgniteCdcStreamer} startup. This startup can be used to start Ignite
- * {@link KafkaToIgniteCdcStreamer} application outside of any hosting environment from command line.
+ * This class defines command-line Kafka to Ignite startup. This startup can be used to start Ignite
+ * {@link KafkaToIgniteCdcStreamer} or {@link KafkaToIgniteClientCdcStreamer} application outside of any hosting
+ * environment from command line.
+ * <p/>
* This startup is a Java application with {@link #main(String[])} method that accepts command line arguments.
* It accepts on parameter which is Ignite Spring XML configuration file path.
* You can run this class from command line without parameters to get help message.
@@ -61,7 +63,7 @@ public class KafkaToIgniteCommandLineStartup {
exit("Invalid arguments: " + args[0], true, -1);
try {
- KafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
+ AbstractKafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
streamer.run();
}
@@ -95,14 +97,16 @@ public class KafkaToIgniteCommandLineStartup {
" Where:",
" ?, /help, -help, - show this message.",
" -v - verbose mode (quiet by default).",
- " path - path to Spring XML configuration file.",
- " Path can be absolute or relative to IGNITE_HOME.",
+ " path - path to Spring XML configuration file.",
+ " Path can be absolute or relative to IGNITE_HOME.",
" ",
- "Spring file should contain bean definition of 'org.apache.ignite.configuration.IgniteConfiguration' " +
- "and 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' " +
- "and bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name " +
- "that contains properties to connect to Apache Kafka cluster. " +
- "Note that bean will be fetched by the type and its ID is not used.");
+ "Spring file should contain the following bean definition:",
+ "1. 'org.apache.ignite.configuration.IgniteConfiguration' or " +
+ "'org.apache.ignite.configuration.ClientConfiguration' to connect to Apache Ignite cluster.",
+ "2. 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' to configure streamer.",
+ "3. Bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name that contains " +
+ "properties to connect to Apache Kafka cluster. ",
+ "Note that bean will be fetched by the type and its ID is not used.");
}
System.exit(exitCode);
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
index ae5affb..2af82f2 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.Map;
import java.util.Properties;
import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -32,36 +33,45 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import static org.apache.ignite.internal.IgniteComponentType.SPRING;
/**
- * Utility class to load {@link KafkaToIgniteCdcStreamer} from Spring XML configuration.
+ * Utility class to load implementation of {@link AbstractKafkaToIgniteCdcStreamer} from Spring XML configuration.
*/
public class KafkaToIgniteLoader {
/** Kafka properties bean name. */
public static final String KAFKA_PROPERTIES = "kafkaProperties";
/**
- * Loads {@link KafkaToIgniteCdcStreamer} from XML configuration file.
+ * Loads {@link AbstractKafkaToIgniteCdcStreamer} from XML configuration file.
* If load fails then error message wouldn't be null.
*
* @param springXmlPath Path to XML configuration file.
+ * @param <T> Streamer type.
* @return {@code KafkaToIgniteCdcStreamer} instance.
* @throws IgniteCheckedException If failed.
*/
- public static KafkaToIgniteCdcStreamer loadKafkaToIgniteStreamer(String springXmlPath) throws IgniteCheckedException {
+ public static <T extends AbstractKafkaToIgniteCdcStreamer> T loadKafkaToIgniteStreamer(String springXmlPath)
+ throws IgniteCheckedException {
URL cfgUrl = U.resolveSpringUrl(springXmlPath);
IgniteSpringHelper spring = SPRING.create(false);
GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends GridSpringResourceContext> cfgTuple =
spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
- IgniteConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);
+ IgniteConfiguration.class, ClientConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);
Collection<IgniteConfiguration> ignCfg = cfgTuple.get2().get(IgniteConfiguration.class);
+ Collection<ClientConfiguration> clientCfg = cfgTuple.get2().get(ClientConfiguration.class);
- if (ignCfg.size() > 1) {
- throw new IgniteCheckedException(
- "Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size()
- );
- }
+ if (ignCfg.isEmpty() && clientCfg.isEmpty())
+ throw new IgniteCheckedException("IgniteConfiguration or ClientConfiguration should be defined.");
+
+ if (!ignCfg.isEmpty() && !clientCfg.isEmpty())
+ throw new IgniteCheckedException("Either IgniteConfiguration or ClientConfiguration should be defined.");
+
+ if (ignCfg.size() > 1)
+ throw new IgniteCheckedException("Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size());
+
+ if (clientCfg.size() > 1)
+ throw new IgniteCheckedException("Exact 1 ClientConfiguration should be defined. Found " + clientCfg.size());
Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);
@@ -75,6 +85,9 @@ public class KafkaToIgniteLoader {
Properties kafkaProps = (Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);
- return new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
+ if (ignCfg.isEmpty())
+ return (T)new KafkaToIgniteClientCdcStreamer(clientCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
+ else
+ return (T)new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
}
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index f15e061..581e74c 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -23,10 +23,8 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.TypeMapping;
-import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -43,8 +41,8 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
/** */
public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
- /** Ignite instance. */
- private final IgniteEx ign;
+ /** Binary context. */
+ private final BinaryContext ctx;
/** Log. */
private final IgniteLogger log;
@@ -59,18 +57,18 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
private final AtomicLong rcvdEvts = new AtomicLong();
/**
- * @param ign Ignite instance.
+ * @param ctx Binary context.
* @param log Logger.
* @param initProps Kafka properties.
* @param streamerCfg Streamer configuration.
*/
public KafkaToIgniteMetadataUpdater(
- IgniteEx ign,
+ BinaryContext ctx,
IgniteLogger log,
Properties initProps,
KafkaToIgniteCdcStreamerConfiguration streamerCfg
) {
- this.ign = ign;
+ this.ctx = ctx;
this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
@@ -90,8 +88,6 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
/** Polls all available records from metadata topic and applies it to Ignite. */
public synchronized void updateMetadata() {
- BinaryContext ctx = ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
-
while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index d4517cd..144b8b8 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -49,8 +50,10 @@ import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -85,31 +88,37 @@ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
/** */
@RunWith(Parameterized.class)
public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
- /** Cache atomicity mode. */
+ /** Client type to connect to a destination cluster. */
@Parameterized.Parameter
+ public ClientType clientType;
+
+ /** Cache atomicity mode. */
+ @Parameterized.Parameter(1)
public CacheAtomicityMode atomicity;
/** Cache replication mode. */
- @Parameterized.Parameter(1)
+ @Parameterized.Parameter(2)
public CacheMode mode;
/** */
- @Parameterized.Parameter(2)
+ @Parameterized.Parameter(3)
public int backups;
/** @return Test parameters. */
- @Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
+ @Parameterized.Parameters(name = "clientType={0}, atomicity={1}, mode={2}, backupCnt={3}")
public static Collection<?> parameters() {
List<Object[]> params = new ArrayList<>();
- for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
- for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
- for (int backups = 0; backups < 2; backups++) {
- // backupCount ignored for REPLICATED caches.
- if (backups > 0 && mode == REPLICATED)
- continue;
+ for (ClientType clientType : ClientType.values()) {
+ for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
+ for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
+ for (int backups = 0; backups < 2; backups++) {
+ // backupCount ignored for REPLICATED caches.
+ if (backups > 0 && mode == REPLICATED)
+ continue;
- params.add(new Object[] {atomicity, mode, backups});
+ params.add(new Object[] {clientType, atomicity, mode, backups});
+ }
}
}
}
@@ -508,8 +517,10 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** */
private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
- for (int i = 0; i < futs.size(); i++)
- assertFalse("Fut " + i, futs.get(i).isDone());
+ for (int i = 0; i < futs.size(); i++) {
+ assertFalse("Fut " + i + ", error: " + X.getFullStackTrace(futs.get(i).error()),
+ futs.get(i).isDone());
+ }
return res;
}
@@ -544,6 +555,19 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
}
+ /** @return Destination cluster host addresses. */
+ protected String[] hostAddresses(IgniteEx[] dest) {
+ String[] addrs = new String[dest.length];
+
+ for (int i = 0; i < dest.length; i++) {
+ ClusterNode node = dest[i].localNode();
+
+ addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+ }
+
+ return addrs;
+ }
+
/** */
protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);
@@ -637,4 +661,13 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
this.orgId = orgId;
}
}
+
+ /** Client type to connect to a destination cluster. */
+ protected enum ClientType {
+ /** Client node. */
+ CLIENT_NODE,
+
+ /** Thin client. */
+ THIN_CLIENT;
+ }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 5a0a221..8273fc9 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -21,7 +21,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
+import org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer;
+import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
@@ -35,7 +38,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], cache));
+ futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache));
return futs;
}
@@ -45,35 +48,50 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
List<IgniteInternalFuture<?>> futs = new ArrayList<>();
for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+ futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
for (int i = 0; i < destCluster.length; i++)
- futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+ futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
return futs;
}
/** {@inheritDoc} */
@Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
- assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
- assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+ assertNotNull(longMetric.apply(AbstractIgniteCdcStreamer.LAST_EVT_TIME));
+ assertNotNull(longMetric.apply(AbstractIgniteCdcStreamer.EVTS_CNT));
}
/**
* @param srcCfg Ignite source node configuration.
* @param destCfg Ignite destination cluster configuration.
+ * @param dest Ignite destination cluster.
* @param cache Cache name to stream to kafka.
* @return Future for Change Data Capture application.
*/
- protected IgniteInternalFuture<?> igniteToIgnite(IgniteConfiguration srcCfg, IgniteConfiguration destCfg, String cache) {
+ protected IgniteInternalFuture<?> igniteToIgnite(
+ IgniteConfiguration srcCfg,
+ IgniteConfiguration destCfg,
+ IgniteEx[] dest,
+ String cache
+ ) {
return runAsync(() -> {
CdcConfiguration cdcCfg = new CdcConfiguration();
- cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
- .setDestinationIgniteConfiguration(destCfg)
- .setMaxBatchSize(KEYS_CNT)
- .setCaches(Collections.singleton(cache)));
+ AbstractIgniteCdcStreamer streamer;
+ if (clientType == ClientType.THIN_CLIENT) {
+ streamer = new IgniteToIgniteClientCdcStreamer()
+ .setDestinationClientConfiguration(new ClientConfiguration()
+ .setAddresses(hostAddresses(dest)));
+ }
+ else
+ streamer = new IgniteToIgniteCdcStreamer().setDestinationIgniteConfiguration(destCfg);
+
+ streamer.setMaxBatchSize(KEYS_CNT);
+ streamer.setCaches(Collections.singleton(cache));
+
+ cdcCfg.setConsumer(streamer);
cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
index f182e77..43e7db6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
@@ -20,7 +20,6 @@ package org.apache.ignite.cdc;
import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
-import org.apache.ignite.cdc.thin.CdcIgniteToIgniteClientReplicationTest;
import org.junit.runner.RunWith;
import org.junit.runners.Suite;
@@ -35,8 +34,7 @@ import org.junit.runners.Suite;
KafkaToIgniteLoaderTest.class,
CdcKafkaReplicationTest.class,
CdcKafkaReplicationAppsTest.class,
- ConflictResolverRestartTest.class,
- CdcIgniteToIgniteClientReplicationTest.class
+ ConflictResolverRestartTest.class
})
public class IgniteCdcTestSuite {
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 1d88321..27d900d 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -22,9 +22,12 @@ import java.io.FileOutputStream;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
+import java.util.stream.Collectors;
import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
@@ -77,6 +80,9 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
/** */
public static final String PROPS_PATH = "PROPS_PATH";
+ /** */
+ public static final String HOST_ADDRESSES = "HOST_ADDRESSES";
+
/** */
private String kafkaPropsPath = null;
@@ -127,16 +133,32 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
String topic,
String metadataTopic,
IgniteConfiguration igniteCfg,
+ IgniteEx[] dest,
int partFrom,
int partTo
) {
Map<String, String> params = new HashMap<>();
- int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+ String cfg;
+
+ if (clientType == ClientType.THIN_CLIENT) {
+ cfg = "/replication/kafka-to-ignite-client.xml";
+
+ String addresses = Arrays.stream(hostAddresses(dest)).map(addr -> "<value>" + addr + "</value>")
+ .collect(Collectors.joining());
+
+ params.put(HOST_ADDRESSES, addresses);
+ }
+ else {
+ cfg = "/replication/kafka-to-ignite.xml";
+
+ int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+
+ params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+ params.put(DISCO_PORT, Integer.toString(discoPort));
+ params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
+ }
- params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
- params.put(DISCO_PORT, Integer.toString(discoPort));
- params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
params.put(REPLICATED_CACHE, cacheName);
params.put(TOPIC, topic);
params.put(METADATA_TOPIC, metadataTopic);
@@ -147,7 +169,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
return runAsync(
- () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
+ () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)})
);
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index fed0678..cbc2f94 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
import org.apache.ignite.cdc.AbstractReplicationTest;
import org.apache.ignite.cdc.CdcConfiguration;
import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
+import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
@@ -113,6 +114,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
cache,
SRC_DEST_META_TOPIC,
destClusterCliCfg[i],
+ destCluster,
i * (DFLT_PARTS / 2),
(i + 1) * (DFLT_PARTS / 2)
));
@@ -136,6 +138,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
SRC_DEST_TOPIC,
SRC_DEST_META_TOPIC,
destClusterCliCfg[0],
+ destCluster,
0,
DFLT_PARTS
));
@@ -145,6 +148,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
DEST_SRC_TOPIC,
DEST_SRC_META_TOPIC,
srcClusterCliCfg[0],
+ srcCluster,
0,
DFLT_PARTS
));
@@ -199,6 +203,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
/**
* @param cacheName Cache name.
* @param igniteCfg Ignite configuration.
+ * @param dest Destination Ignite cluster.
* @return Future for runed {@link KafkaToIgniteCdcStreamer}.
*/
protected IgniteInternalFuture<?> kafkaToIgnite(
@@ -206,6 +211,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
String topic,
String metadataTopic,
IgniteConfiguration igniteCfg,
+ IgniteEx[] dest,
int fromPart,
int toPart
) {
@@ -220,7 +226,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
cfg.setMetadataTopic(metadataTopic);
cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
- return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
+ if (clientType == ClientType.THIN_CLIENT) {
+ ClientConfiguration clientCfg = new ClientConfiguration();
+
+ clientCfg.setAddresses(hostAddresses(dest));
+
+ return runAsync(new KafkaToIgniteClientCdcStreamer(clientCfg, kafkaProperties(), cfg));
+ }
+ else
+ return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
}
/** */
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 4fd818e..cb2d385 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -49,6 +49,35 @@ public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
assertNotNull(streamer);
}
+ /** */
+ @Test
+ public void testLoadIgniteClientConfig() throws Exception {
+ assertThrows(
+ null,
+ () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-double-client-cfg.xml"),
+ IgniteCheckedException.class,
+ "Exact 1 ClientConfiguration should be defined. Found 2"
+ );
+
+ assertThrows(
+ null,
+ () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-without-kafka-properties.xml"),
+ IgniteCheckedException.class,
+ "Spring bean with provided name doesn't exist"
+ );
+
+ assertThrows(
+ null,
+ () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml"),
+ IgniteCheckedException.class,
+ "Either IgniteConfiguration or ClientConfiguration should be defined."
+ );
+
+ KafkaToIgniteClientCdcStreamer streamer = loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml");
+
+ assertNotNull(streamer);
+ }
+
/** */
@Test
public void testInitSpringContextOnce() throws Exception {
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
deleted file mode 100644
index b810e18..0000000
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cdc.thin;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-import org.apache.ignite.cdc.AbstractReplicationTest;
-import org.apache.ignite.cdc.CdcConfiguration;
-import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.ClientConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cdc.CdcMain;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
-
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-
-/**
- * {@link IgniteToIgniteClientCdcStreamer} test.
- */
-public class CdcIgniteToIgniteClientReplicationTest extends AbstractReplicationTest {
- /** {@inheritDoc} */
- @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
- List<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
- for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, cache));
-
- return futs;
- }
-
- /** {@inheritDoc} */
- @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
- List<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
- for (int i = 0; i < srcCluster.length; i++)
- futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, ACTIVE_ACTIVE_CACHE));
-
- for (int i = 0; i < destCluster.length; i++)
- futs.add(igniteToIgniteClient(destCluster[i].configuration(), srcCluster, ACTIVE_ACTIVE_CACHE));
-
- return futs;
- }
-
- /** {@inheritDoc} */
- @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
- assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
- assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
- }
-
- /**
- * @param srcCfg Ignite source node configuration.
- * @param dest Destination cluster.
- * @param cache Cache name to replicate.
- * @return Future for Change Data Capture application.
- */
- private IgniteInternalFuture<?> igniteToIgniteClient(IgniteConfiguration srcCfg, IgniteEx[] dest, String cache) {
- return runAsync(() -> {
- ClientConfiguration clientCfg = new ClientConfiguration();
-
- String[] addrs = new String[dest.length];
-
- for (int i = 0; i < dest.length; i++) {
- ClusterNode node = dest[i].localNode();
-
- addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
- }
-
- clientCfg.setAddresses(addrs);
-
- CdcConfiguration cdcCfg = new CdcConfiguration();
-
- cdcCfg.setConsumer(new IgniteToIgniteClientCdcStreamer()
- .setDestinationClientConfiguration(clientCfg)
- .setCaches(Collections.singleton(cache))
- .setMaxBatchSize(KEYS_CNT));
-
- cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
-
- CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
-
- cdcs.add(cdc);
-
- cdc.run();
- });
- }
-}
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index 78a8809..b693424 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -44,4 +44,4 @@
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
index 545a338..f236f30 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
@@ -35,4 +35,4 @@
</bean>
</property>
</bean>
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
index 892b177..586de47 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
@@ -35,4 +35,4 @@
</bean>
<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
similarity index 73%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
index 78a8809..caa8948 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
@@ -23,16 +23,11 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
- <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="dataStorageConfiguration">
- <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
- <property name="defaultDataRegionConfiguration">
- <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
- <property name="cdcEnabled" value="true" />
- <property name="persistenceEnabled" value="true" />
- </bean>
- </property>
- </bean>
+ <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
</property>
</bean>
@@ -44,4 +39,4 @@
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
similarity index 63%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
index 545a338..dde7ca1 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
@@ -21,18 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <import resource="kafka-to-ignite-correct.xml" />
+ <import resource="kafka-to-ignite-client-correct.xml" />
- <bean id="grid.cfg2" class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="dataStorageConfiguration">
- <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
- <property name="defaultDataRegionConfiguration">
- <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
- <property name="cdcEnabled" value="true" />
- <property name="persistenceEnabled" value="true" />
- </bean>
- </property>
- </bean>
+ <bean id="client2.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
</property>
</bean>
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
similarity index 90%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
index 78a8809..fa11ab7 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
@@ -23,6 +23,14 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
+ </property>
+ </bean>
+
<bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="dataStorageConfiguration">
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
@@ -44,4 +52,4 @@
</bean>
<util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
similarity index 67%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
index 892b177..85cdce5 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
@@ -21,18 +21,13 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="dataStorageConfiguration">
- <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
- <property name="defaultDataRegionConfiguration">
- <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
- <property name="cdcEnabled" value="true" />
- <property name="persistenceEnabled" value="true" />
- </bean>
- </property>
- </bean>
+ <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>
+ <value>127.0.0.1:10800</value>
+ </list>
</property>
</bean>
<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
similarity index 61%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
index 78a8809..a676ef8 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
@@ -23,25 +23,27 @@
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
- <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="dataStorageConfiguration">
- <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
- <property name="defaultDataRegionConfiguration">
- <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
- <property name="cdcEnabled" value="true" />
- <property name="persistenceEnabled" value="true" />
- </bean>
- </property>
- </bean>
+ <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+ <property name="addresses">
+ <list>{HOST_ADDRESSES}</list>
</property>
</bean>
<bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
- <property name="topic" value="ignite" />
- <property name="metadataTopic" value="ignite-metadata" />
- <property name="kafkaPartsFrom" value="0" />
- <property name="kafkaPartsTo" value="16" />
+ <property name="caches">
+ <util:list>
+ <bean class="java.lang.String">
+ <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+ </bean>
+ </util:list>
+ </property>
+ <property name="topic" value="{TOPIC}" />
+ <property name="metadataTopic" value="{METADATA_TOPIC}" />
+ <property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
+ <property name="kafkaPartsTo" value="{PARTS_TO}"/>
+ <property name="threadCount" value="{THREAD_CNT}"/>
+ <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}"/>
</bean>
- <util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+ <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>