You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by na...@apache.org on 2022/09/13 15:29:21 UTC

[ignite-extensions] branch master updated: IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)

This is an automated email from the ASF dual-hosted git repository.

namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f533b4  IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)
9f533b4 is described below

commit 9f533b4489ad8a8347c4ceba0669415dcb6cdc56
Author: Nikita Amelchev <ns...@gmail.com>
AuthorDate: Tue Sep 13 18:29:17 2022 +0300

    IGNITE-17452 Implement Kafka to thin client CDC streamer (#176)
---
 .../ignite/cdc/IgniteToIgniteCdcStreamer.java      |   2 +-
 ....java => AbstractKafkaToIgniteCdcStreamer.java} | 187 ++++++++----------
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java |   1 +
 .../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java | 219 +++------------------
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  22 +--
 .../cdc/kafka/KafkaToIgniteClientCdcStreamer.java  | 130 ++++++++++++
 .../cdc/kafka/KafkaToIgniteCommandLineStartup.java |  24 ++-
 .../ignite/cdc/kafka/KafkaToIgniteLoader.java      |  33 +++-
 .../cdc/kafka/KafkaToIgniteMetadataUpdater.java    |  14 +-
 .../apache/ignite/cdc/AbstractReplicationTest.java |  59 ++++--
 .../cdc/CdcIgniteToIgniteReplicationTest.java      |  38 +++-
 .../org/apache/ignite/cdc/IgniteCdcTestSuite.java  |   4 +-
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     |  32 ++-
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  |  16 +-
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  |  29 +++
 .../CdcIgniteToIgniteClientReplicationTest.java    | 108 ----------
 .../resources/loader/kafka-to-ignite-correct.xml   |   2 +-
 .../loader/kafka-to-ignite-double-ignite-cfg.xml   |   2 +-
 .../kafka-to-ignite-without-kafka-properties.xml   |   2 +-
 .../kafka-to-ignite-client-correct.xml}            |  17 +-
 .../kafka-to-ignite-client-double-client-cfg.xml}  |  19 +-
 .../kafka-to-ignite-client-with-ignite-cfg.xml}    |  10 +-
 ...-to-ignite-client-without-kafka-properties.xml} |  17 +-
 .../kafka-to-ignite-client.xml}                    |  34 ++--
 24 files changed, 492 insertions(+), 529 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index 60e1dfa..9b7cc7d 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -58,7 +58,7 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer {
         if (log.isInfoEnabled())
             log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
-        A.notNull(destIgniteCfg, "Destination ignite configuration");
+        A.notNull(destIgniteCfg, "Destination Ignite configuration.");
 
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
 
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
similarity index 63%
copy from modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
copy to modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
index b0a5e4b..a8d28e1 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/AbstractKafkaToIgniteCdcStreamer.java
@@ -18,27 +18,23 @@
 package org.apache.ignite.cdc.kafka;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
-import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
 import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
-import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
-import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridLoggerProxy;
-import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteExperimental;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
@@ -51,50 +47,28 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_
 import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 
 /**
- * Main class of Kafka to Ignite application.
+ * Main abstract class of Kafka to Ignite application.
  * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
  * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
- * and apply {@link CdcEvent} to Ignite.
- * <p>
+ * and apply {@link CdcEvent} to Ignite. There are two implementations:
+ * <ul>
+ *     <li>{@link KafkaToIgniteCdcStreamer} to apply through client node.</li>
+ *     <li>{@link KafkaToIgniteClientCdcStreamer} to apply through thin client.</li>
+ * </ul>
  * Each applier receive even number of kafka topic partition to read.
- * <p>
- * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
- * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
- * such as Kafka or Ignite unavailability.
- * <p>
- * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
- * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
- * Example of Ignite configuration with the conflict resolver:
- * <pre>
- * {@code
- * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
- *
- * conflictPlugin.setClusterId(clusterId); // Cluster id.
- * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
- *
- * IgniteConfiguration cfg = ...;
- *
- * cfg.setPluginProviders(conflictPlugin);
- * }
- * </pre>
- * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
  *
  * @see CdcMain
  * @see IgniteToKafkaCdcStreamer
  * @see CdcEvent
- * @see KafkaToIgniteCdcStreamerApplier
- * @see CacheConflictResolutionManagerImpl
+ * @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteClientCdcStreamer
  */
-@IgniteExperimental
-public class KafkaToIgniteCdcStreamer implements Runnable {
-    /** Ignite configuration. */
-    private final IgniteConfiguration igniteCfg;
-
+abstract class AbstractKafkaToIgniteCdcStreamer implements Runnable {
     /** Kafka consumer properties. */
     private final Properties kafkaProps;
 
     /** Streamer configuration. */
-    private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
+    protected final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
 
     /** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
     private final List<Thread> runners;
@@ -102,16 +76,14 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
     /** Appliers. */
     private final List<AutoCloseable> appliers;
 
+    /** */
+    protected IgniteLogger log;
+
     /**
-     * @param igniteCfg Ignite configuration.
      * @param kafkaProps Kafka properties.
      * @param streamerCfg Streamer configuration.
      */
-    public KafkaToIgniteCdcStreamer(
-        IgniteConfiguration igniteCfg,
-        Properties kafkaProps,
-        KafkaToIgniteCdcStreamerConfiguration streamerCfg
-    ) {
+    public AbstractKafkaToIgniteCdcStreamer(Properties kafkaProps, KafkaToIgniteCdcStreamerConfiguration streamerCfg) {
         A.notNull(streamerCfg.getTopic(), "Kafka topic");
         A.notNull(streamerCfg.getMetadataTopic(), "Kafka metadata topic");
         A.ensure(
@@ -129,7 +101,6 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
             streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
             "Threads count must be less or equals to the total Kafka partitions count.");
 
-        this.igniteCfg = igniteCfg;
         this.kafkaProps = kafkaProps;
         this.streamerCfg = streamerCfg;
 
@@ -146,6 +117,10 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
     /** {@inheritDoc} */
     @Override public void run() {
         try {
+            initLogger();
+
+            ackAsciiLogo(log);
+
             runx();
         }
         catch (Exception e) {
@@ -154,74 +129,65 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
     }
 
     /** */
-    private void runx() throws Exception {
-        U.initWorkDir(igniteCfg);
+    protected void runAppliers() {
+        AtomicBoolean stopped = new AtomicBoolean();
+
+        Set<Integer> caches = null;
 
-        IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+        if (!F.isEmpty(streamerCfg.getCaches())) {
+            checkCaches(streamerCfg.getCaches());
 
-        igniteCfg.setGridLogger(log);
+            caches = streamerCfg.getCaches().stream()
+                .map(CU::cacheId).collect(Collectors.toSet());
+        }
+
+        KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
+            binaryContext(),
+            log,
+            kafkaProps,
+            streamerCfg
+        );
 
-        ackAsciiLogo(log);
+        int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+        int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+        int threadCnt = streamerCfg.getThreadCount();
 
-        try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
-            AtomicBoolean stopped = new AtomicBoolean();
+        int partPerApplier = kafkaParts / threadCnt;
 
-            Set<Integer> caches = null;
+        for (int i = 0; i < threadCnt; i++) {
+            int from = i * partPerApplier;
+            int to = (i + 1) * partPerApplier;
 
-            if (!F.isEmpty(streamerCfg.getCaches())) {
-                caches = streamerCfg.getCaches().stream()
-                    .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
-                    .map(CU::cacheId).collect(Collectors.toSet());
-            }
+            if (i == threadCnt - 1)
+                to = kafkaParts;
 
-            KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
-                ign,
+            KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
+                () -> eventsApplier(),
                 log,
                 kafkaProps,
-                streamerCfg
+                streamerCfg.getTopic(),
+                kafkaPartsFrom + from,
+                kafkaPartsFrom + to,
+                caches,
+                streamerCfg.getMaxBatchSize(),
+                streamerCfg.getKafkaRequestTimeout(),
+                metaUpdr,
+                stopped
             );
 
-            int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
-            int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
-            int threadCnt = streamerCfg.getThreadCount();
-
-            int partPerApplier = kafkaParts / threadCnt;
-
-            for (int i = 0; i < threadCnt; i++) {
-                int from = i * partPerApplier;
-                int to = (i + 1) * partPerApplier;
-
-                if (i == threadCnt - 1)
-                    to = kafkaParts;
-
-                KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
-                    ign,
-                    log,
-                    kafkaProps,
-                    streamerCfg.getTopic(),
-                    kafkaPartsFrom + from,
-                    kafkaPartsFrom + to,
-                    caches,
-                    streamerCfg.getMaxBatchSize(),
-                    streamerCfg.getKafkaRequestTimeout(),
-                    metaUpdr,
-                    stopped
-                );
-
-                addAndStart("applier-thread-" + i, applier);
-            }
-
-            try {
-                for (int i = 0; i < threadCnt + 1; i++)
-                    runners.get(i).join();
-            }
-            catch (InterruptedException e) {
-                stopped.set(true);
-
-                appliers.forEach(U::closeQuiet);
-
-                log.warning("Kafka to Ignite streamer interrupted", e);
-            }
+            addAndStart("applier-thread-" + i, applier);
+        }
+
+        try {
+            for (int i = 0; i < threadCnt + 1; i++)
+                runners.get(i).join();
+        }
+        catch (InterruptedException e) {
+            stopped.set(true);
+
+            appliers.forEach(U::closeQuiet);
+
+            log.warning("Kafka to Ignite streamer interrupted", e);
         }
     }
 
@@ -236,6 +202,21 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
         runners.add(thread);
     }
 
+    /** Init logger. */
+    protected abstract void initLogger() throws Exception;
+
+    /** */
+    protected abstract void runx() throws Exception;
+
+    /** @return Binary context. */
+    protected abstract BinaryContext binaryContext();
+
+    /** @return Cdc events applier. */
+    protected abstract AbstractCdcEventsApplier eventsApplier();
+
+    /** Checks that configured caches exist in a destination cluster. */
+    protected abstract void checkCaches(Collection<String> caches);
+
     /** */
     private void ackAsciiLogo(IgniteLogger log) {
         String ver = "ver. " + ACK_VER_STR;
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index f9ec85c..3ab2b93 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -81,6 +81,7 @@ import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_
  *
  * @see CdcMain
  * @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteClientCdcStreamer
  * @see CacheVersionConflictResolverImpl
  */
 @IgniteExperimental
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
index b0a5e4b..3386a4c 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -17,46 +17,29 @@
 
 package org.apache.ignite.cdc.kafka;
 
-import java.util.ArrayList;
-import java.util.List;
+import java.util.Collection;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
 import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.CdcEventsIgniteApplier;
 import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridLoggerProxy;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.cdc.CdcMain;
-import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.util.typedef.internal.A;
-import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteExperimental;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-
-import static org.apache.ignite.internal.IgniteKernal.NL;
-import static org.apache.ignite.internal.IgniteKernal.SITE;
-import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
-import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 
 /**
  * Main class of Kafka to Ignite application.
  * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
  * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
- * and apply {@link CdcEvent} to Ignite.
- * <p>
- * Each applier receive even number of kafka topic partition to read.
+ * and apply {@link CdcEvent} to Ignite through client node.
  * <p>
  * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
  * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
@@ -86,21 +69,12 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
  * @see CacheConflictResolutionManagerImpl
  */
 @IgniteExperimental
-public class KafkaToIgniteCdcStreamer implements Runnable {
+public class KafkaToIgniteCdcStreamer extends AbstractKafkaToIgniteCdcStreamer {
     /** Ignite configuration. */
     private final IgniteConfiguration igniteCfg;
 
-    /** Kafka consumer properties. */
-    private final Properties kafkaProps;
-
-    /** Streamer configuration. */
-    private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
-
-    /** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
-    private final List<Thread> runners;
-
-    /** Appliers. */
-    private final List<AutoCloseable> appliers;
+    /** Ignite client node. */
+    private IgniteEx ign;
 
     /**
      * @param igniteCfg Ignite configuration.
@@ -112,178 +86,45 @@ public class KafkaToIgniteCdcStreamer implements Runnable {
         Properties kafkaProps,
         KafkaToIgniteCdcStreamerConfiguration streamerCfg
     ) {
-        A.notNull(streamerCfg.getTopic(), "Kafka topic");
-        A.notNull(streamerCfg.getMetadataTopic(), "Kafka metadata topic");
-        A.ensure(
-            streamerCfg.getKafkaPartsFrom() >= 0,
-            "The Kafka partitions lower bound must be explicitly set to a value greater than or equals to zero.");
-        A.ensure(
-            streamerCfg.getKafkaPartsTo() > 0,
-            "The Kafka partitions upper bound must be explicitly set to a value greater than zero.");
-        A.ensure(
-            streamerCfg.getKafkaPartsTo() > streamerCfg.getKafkaPartsFrom(),
-            "The Kafka partitions upper bound must be greater than lower bound.");
-        A.ensure(streamerCfg.getKafkaRequestTimeout() >= 0, "The Kafka request timeout cannot be negative.");
-        A.ensure(streamerCfg.getThreadCount() > 0, "Threads count value must me greater than zero.");
-        A.ensure(
-            streamerCfg.getKafkaPartsTo() - streamerCfg.getKafkaPartsFrom() >= streamerCfg.getThreadCount(),
-            "Threads count must be less or equals to the total Kafka partitions count.");
+        super(kafkaProps, streamerCfg);
 
-        this.igniteCfg = igniteCfg;
-        this.kafkaProps = kafkaProps;
-        this.streamerCfg = streamerCfg;
-
-        appliers = new ArrayList<>(streamerCfg.getThreadCount());
-        runners = new ArrayList<>(streamerCfg.getThreadCount());
-
-        if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
-            throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
+        A.notNull(igniteCfg, "Destination Ignite configuration.");
 
-        kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
-        kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+        this.igniteCfg = igniteCfg;
     }
 
     /** {@inheritDoc} */
-    @Override public void run() {
-        try {
-            runx();
-        }
-        catch (Exception e) {
-            throw new IgniteException(e);
-        }
-    }
-
-    /** */
-    private void runx() throws Exception {
+    @Override protected void initLogger() throws Exception {
         U.initWorkDir(igniteCfg);
 
-        IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+        log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
 
         igniteCfg.setGridLogger(log);
+    }
 
-        ackAsciiLogo(log);
-
+    /** {@inheritDoc} */
+    @Override protected void runx() throws Exception {
         try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
-            AtomicBoolean stopped = new AtomicBoolean();
-
-            Set<Integer> caches = null;
-
-            if (!F.isEmpty(streamerCfg.getCaches())) {
-                caches = streamerCfg.getCaches().stream()
-                    .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
-                    .map(CU::cacheId).collect(Collectors.toSet());
-            }
-
-            KafkaToIgniteMetadataUpdater metaUpdr = new KafkaToIgniteMetadataUpdater(
-                ign,
-                log,
-                kafkaProps,
-                streamerCfg
-            );
-
-            int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
-            int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
-            int threadCnt = streamerCfg.getThreadCount();
+            this.ign = ign;
 
-            int partPerApplier = kafkaParts / threadCnt;
-
-            for (int i = 0; i < threadCnt; i++) {
-                int from = i * partPerApplier;
-                int to = (i + 1) * partPerApplier;
-
-                if (i == threadCnt - 1)
-                    to = kafkaParts;
-
-                KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
-                    ign,
-                    log,
-                    kafkaProps,
-                    streamerCfg.getTopic(),
-                    kafkaPartsFrom + from,
-                    kafkaPartsFrom + to,
-                    caches,
-                    streamerCfg.getMaxBatchSize(),
-                    streamerCfg.getKafkaRequestTimeout(),
-                    metaUpdr,
-                    stopped
-                );
-
-                addAndStart("applier-thread-" + i, applier);
-            }
-
-            try {
-                for (int i = 0; i < threadCnt + 1; i++)
-                    runners.get(i).join();
-            }
-            catch (InterruptedException e) {
-                stopped.set(true);
-
-                appliers.forEach(U::closeQuiet);
-
-                log.warning("Kafka to Ignite streamer interrupted", e);
-            }
+            runAppliers();
         }
     }
 
-    /** Adds applier to {@link #appliers} and starts thread with it. */
-    private <T extends AutoCloseable & Runnable> void addAndStart(String threadName, T applier) {
-        appliers.add(applier);
-
-        Thread thread = new Thread(applier, threadName);
-
-        thread.start();
+    /** {@inheritDoc} */
+    @Override protected AbstractCdcEventsApplier eventsApplier() {
+        U.setCurrentIgniteName(ign.name());
 
-        runners.add(thread);
+        return new CdcEventsIgniteApplier(ign, streamerCfg.getMaxBatchSize(), log);
     }
 
-    /** */
-    private void ackAsciiLogo(IgniteLogger log) {
-        String ver = "ver. " + ACK_VER_STR;
-
-        if (log.isInfoEnabled()) {
-            log.info(NL + NL +
-                ">>>    __ _____   ______ _____     __________    __________  ________________" + NL +
-                ">>>   / //_/ _ | / __/ //_/ _ |   /_  __/ __ \\  /  _/ ___/ |/ /  _/_  __/ __/" + NL +
-                ">>>  / ,< / __ |/ _// ,< / __ |    / / / /_/ / _/ // (_ /    // /  / / / _/  " + NL +
-                ">>> /_/|_/_/ |_/_/ /_/|_/_/ |_|   /_/  \\____/ /___/\\___/_/|_/___/ /_/ /___/  " + NL +
-                ">>> " + NL +
-                ">>> " + NL +
-                ">>> " + ver + NL +
-                ">>> " + COPYRIGHT + NL +
-                ">>> " + NL +
-                ">>> Ignite documentation: " + "http://" + SITE + NL +
-                ">>> Kafka topic: " + streamerCfg.getTopic() + NL +
-                ">>> Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo() + NL
-            );
-        }
-
-        if (log.isQuiet()) {
-            U.quiet(false,
-                "   __ _____   ______ _____     __________    __________  ________________",
-                "  / //_/ _ | / __/ //_/ _ |   /_  __/ __ \\  /  _/ ___/ |/ /  _/_  __/ __/",
-                " / ,< / __ |/ _// ,< / __ |    / / / /_/ / _/ // (_ /    // /  / / / _/  ",
-                "/_/|_/_/ |_/_/ /_/|_/_/ |_|   /_/  \\____/ /___/\\___/_/|_/___/ /_/ /___/  ",
-                "",
-                ver,
-                COPYRIGHT,
-                "",
-                "Ignite documentation: " + "http://" + SITE,
-                "Kafka topic: " + streamerCfg.getTopic(),
-                "Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo(),
-                "",
-                "Quiet mode.");
-
-            String fileName = log.fileName();
-
-            if (fileName != null)
-                U.quiet(false, "  ^-- Logging to file '" + fileName + '\'');
-
-            if (log instanceof GridLoggerProxy)
-                U.quiet(false, "  ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
+    /** {@inheritDoc} */
+    @Override protected BinaryContext binaryContext() {
+        return ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
+    }
 
-            U.quiet(false,
-                "  ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to kafka-to-ignite.{sh|bat}",
-                "");
-        }
+    /** {@inheritDoc} */
+    @Override protected void checkCaches(Collection<String> caches) {
+        caches.forEach(name -> Objects.requireNonNull(ign.cache(name), name + " not exists!"));
     }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 7b7f97a..ceb688e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -31,20 +31,18 @@ import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Supplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cache.CacheEntryVersion;
 import org.apache.ignite.cdc.AbstractCdcEventsApplier;
 import org.apache.ignite.cdc.CdcEvent;
-import org.apache.ignite.cdc.CdcEventsIgniteApplier;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
 import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -85,9 +83,6 @@ import static org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer.META_UPDATE_M
  * @see CacheEntryVersion
  */
 class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
-    /** Ignite instance. */
-    private final IgniteEx ign;
-
     /** Log. */
     private final IgniteLogger log;
 
@@ -121,11 +116,14 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
     /** */
     private final AtomicLong rcvdEvts = new AtomicLong();
 
-    /** */
+    /** Cdc events applier supplier. */
+    private final Supplier<AbstractCdcEventsApplier> applierSupplier;
+
+    /** Cdc events applier. */
     private AbstractCdcEventsApplier applier;
 
     /**
-     * @param ign Ignite instance.
+     * @param applierSupplier Cdc events applier supplier.
      * @param log Logger.
      * @param kafkaProps Kafka properties.
      * @param topic Topic name.
@@ -138,7 +136,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
      * @param stopped Stopped flag.
      */
     public KafkaToIgniteCdcStreamerApplier(
-        IgniteEx ign,
+        Supplier<AbstractCdcEventsApplier> applierSupplier,
         IgniteLogger log,
         Properties kafkaProps,
         String topic,
@@ -150,7 +148,7 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
         KafkaToIgniteMetadataUpdater metaUpdr,
         AtomicBoolean stopped
     ) {
-        this.ign = ign;
+        this.applierSupplier = applierSupplier;
         this.kafkaProps = kafkaProps;
         this.topic = topic;
         this.kafkaPartFrom = kafkaPartFrom;
@@ -160,13 +158,11 @@ class KafkaToIgniteCdcStreamerApplier implements Runnable, AutoCloseable {
         this.metaUpdr = metaUpdr;
         this.stopped = stopped;
         this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
-
-        applier = new CdcEventsIgniteApplier(ign, maxBatchSize, log);
     }
 
     /** {@inheritDoc} */
     @Override public void run() {
-        U.setCurrentIgniteName(ign.name());
+        applier = applierSupplier.get();
 
         try {
             for (int kafkaPart = kafkaPartFrom; kafkaPart < kafkaPartTo; kafkaPart++) {
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java
new file mode 100644
index 0000000..6d05aab
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteClientCdcStreamer.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collection;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.AbstractCdcEventsApplier;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.thin.CdcEventsIgniteClientApplier;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.configuration.ClientConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.GridBinaryMarshaller;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.client.thin.ClientBinary;
+import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteExperimental;
+
+/**
+ * Main class of Kafka to Ignite application.
+ * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
+ * and apply {@link CdcEvent} to Ignite through thin client.
+ * <p>
+ * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ * <p>
+ * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
+ * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
+ * Example of Ignite configuration with the conflict resolver:
+ * <pre>
+ * {@code
+ * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
+ *
+ * conflictPlugin.setClusterId(clusterId); // Cluster id.
+ * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
+ *
+ * IgniteConfiguration cfg = ...;
+ *
+ * cfg.setPluginProviders(conflictPlugin);
+ * }
+ * </pre>
+ * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
+ *
+ * @see CdcMain
+ * @see IgniteToKafkaCdcStreamer
+ * @see CdcEvent
+ * @see IgniteClient
+ * @see KafkaToIgniteCdcStreamerApplier
+ * @see CacheConflictResolutionManagerImpl
+ */
+@IgniteExperimental
+public class KafkaToIgniteClientCdcStreamer extends AbstractKafkaToIgniteCdcStreamer {
+    /** Ignite thin client configuration. */
+    private final ClientConfiguration clientCfg;
+
+    /** Ignite thin client. */
+    private IgniteClient client;
+
+    /**
+     * @param clientCfg Ignite thin client configuration.
+     * @param kafkaProps Kafka properties.
+     * @param streamerCfg Streamer configuration.
+     */
+    public KafkaToIgniteClientCdcStreamer(
+        ClientConfiguration clientCfg,
+        Properties kafkaProps,
+        KafkaToIgniteCdcStreamerConfiguration streamerCfg
+    ) {
+        super(kafkaProps, streamerCfg);
+
+        A.notNull(clientCfg, "Destination thin client configuration.");
+
+        this.clientCfg = clientCfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void initLogger() throws Exception {
+        log = U.initLogger(null, "kafka-ignite-streamer", UUID.randomUUID(), U.defaultWorkDirectory());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void runx() throws Exception {
+        try (IgniteClient client = Ignition.startClient(clientCfg)) {
+            this.client = client;
+
+            runAppliers();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected AbstractCdcEventsApplier eventsApplier() {
+        GridBinaryMarshaller.popContext(binaryContext());
+
+        return new CdcEventsIgniteClientApplier(client, streamerCfg.getMaxBatchSize(), log);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected BinaryContext binaryContext() {
+        return ((ClientBinary)client.binary()).binaryContext();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void checkCaches(Collection<String> caches) {
+        Collection<String> clusterCaches = client.cacheNames();
+
+        caches.forEach(name -> A.ensure(clusterCaches.contains(name), name + " not exists!"));
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
index c425664..f14aad8 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
@@ -27,8 +27,10 @@ import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
 import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
 
 /**
- * This class defines command-line {@link KafkaToIgniteCdcStreamer} startup. This startup can be used to start Ignite
- * {@link KafkaToIgniteCdcStreamer} application outside of any hosting environment from command line.
+ * This class defines command-line Kafka to Ignite startup. This startup can be used to start Ignite
+ * {@link KafkaToIgniteCdcStreamer} or {@link KafkaToIgniteClientCdcStreamer} application outside of any hosting
+ * environment from command line.
+ * <p/>
  * This startup is a Java application with {@link #main(String[])} method that accepts command line arguments.
  * It accepts on parameter which is Ignite Spring XML configuration file path.
  * You can run this class from command line without parameters to get help message.
@@ -61,7 +63,7 @@ public class KafkaToIgniteCommandLineStartup {
             exit("Invalid arguments: " + args[0], true, -1);
 
         try {
-            KafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
+            AbstractKafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
 
             streamer.run();
         }
@@ -95,14 +97,16 @@ public class KafkaToIgniteCommandLineStartup {
                 "    Where:",
                 "    ?, /help, -help, - show this message.",
                 "    -v               - verbose mode (quiet by default).",
-                "    path            - path to Spring XML configuration file.",
-                "                      Path can be absolute or relative to IGNITE_HOME.",
+                "    path             - path to Spring XML configuration file.",
+                "                       Path can be absolute or relative to IGNITE_HOME.",
                 " ",
-                "Spring file should contain bean definition of 'org.apache.ignite.configuration.IgniteConfiguration' " +
-                    "and 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' " +
-                    "and bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name " +
-                    "that contains properties to connect to Apache Kafka cluster. " +
-                    "Note that bean will be fetched by the type and its ID is not used.");
+                "Spring file should contain the following bean definition:",
+                "1. 'org.apache.ignite.configuration.IgniteConfiguration' or " +
+                    "'org.apache.ignite.configuration.ClientConfiguration' to connect to Apache Ignite cluster.",
+                "2. 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' to configure streamer.",
+                "3. Bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name that contains " +
+                    "properties to connect to Apache Kafka cluster. ",
+                "Note that bean will be fetched by the type and its ID is not used.");
         }
 
         System.exit(exitCode);
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
index ae5affb..2af82f2 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -22,6 +22,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
 import org.apache.ignite.internal.util.lang.GridTuple3;
@@ -32,36 +33,45 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import static org.apache.ignite.internal.IgniteComponentType.SPRING;
 
 /**
- * Utility class to load {@link KafkaToIgniteCdcStreamer} from Spring XML configuration.
+ * Utility class to load implementation of {@link AbstractKafkaToIgniteCdcStreamer} from Spring XML configuration.
  */
 public class KafkaToIgniteLoader {
     /** Kafka properties bean name. */
     public static final String KAFKA_PROPERTIES = "kafkaProperties";
 
     /**
-     * Loads {@link KafkaToIgniteCdcStreamer} from XML configuration file.
+     * Loads {@link AbstractKafkaToIgniteCdcStreamer} from XML configuration file.
      * If load fails then error message wouldn't be null.
      *
      * @param springXmlPath Path to XML configuration file.
+     * @param <T> Streamer type.
      * @return {@code KafkaToIgniteCdcStreamer} instance.
      * @throws IgniteCheckedException If failed.
      */
-    public static KafkaToIgniteCdcStreamer loadKafkaToIgniteStreamer(String springXmlPath) throws IgniteCheckedException {
+    public static <T extends AbstractKafkaToIgniteCdcStreamer> T loadKafkaToIgniteStreamer(String springXmlPath)
+        throws IgniteCheckedException {
         URL cfgUrl = U.resolveSpringUrl(springXmlPath);
 
         IgniteSpringHelper spring = SPRING.create(false);
 
         GridTuple3<Map<String, ?>, Map<Class<?>, Collection>, ? extends GridSpringResourceContext> cfgTuple =
             spring.loadBeans(cfgUrl, F.asList(KAFKA_PROPERTIES),
-                IgniteConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);
+                IgniteConfiguration.class, ClientConfiguration.class, KafkaToIgniteCdcStreamerConfiguration.class);
 
         Collection<IgniteConfiguration> ignCfg = cfgTuple.get2().get(IgniteConfiguration.class);
+        Collection<ClientConfiguration> clientCfg = cfgTuple.get2().get(ClientConfiguration.class);
 
-        if (ignCfg.size() > 1) {
-            throw new IgniteCheckedException(
-                "Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size()
-            );
-        }
+        if (ignCfg.isEmpty() && clientCfg.isEmpty())
+            throw new IgniteCheckedException("IgniteConfiguration or ClientConfiguration should be defined.");
+
+        if (!ignCfg.isEmpty() && !clientCfg.isEmpty())
+            throw new IgniteCheckedException("Either IgniteConfiguration or ClientConfiguration should be defined.");
+
+        if (ignCfg.size() > 1)
+            throw new IgniteCheckedException("Exact 1 IgniteConfiguration should be defined. Found " + ignCfg.size());
+
+        if (clientCfg.size() > 1)
+            throw new IgniteCheckedException("Exact 1 ClientConfiguration should be defined. Found " + clientCfg.size());
 
         Collection<KafkaToIgniteCdcStreamerConfiguration> k2iCfg =
             cfgTuple.get2().get(KafkaToIgniteCdcStreamerConfiguration.class);
@@ -75,6 +85,9 @@ public class KafkaToIgniteLoader {
 
         Properties kafkaProps = (Properties)cfgTuple.get1().get(KAFKA_PROPERTIES);
 
-        return new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
+        if (ignCfg.isEmpty())
+            return (T)new KafkaToIgniteClientCdcStreamer(clientCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
+        else
+            return (T)new KafkaToIgniteCdcStreamer(ignCfg.iterator().next(), kafkaProps, k2iCfg.iterator().next());
     }
 }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index f15e061..581e74c 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -23,10 +23,8 @@ import java.util.Properties;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cdc.TypeMapping;
-import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -43,8 +41,8 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZE
 
 /** */
 public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
-    /** Ignite instance. */
-    private final IgniteEx ign;
+    /** Binary context. */
+    private final BinaryContext ctx;
 
     /** Log. */
     private final IgniteLogger log;
@@ -59,18 +57,18 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
     private final AtomicLong rcvdEvts = new AtomicLong();
 
     /**
-     * @param ign Ignite instance.
+     * @param ctx Binary context.
      * @param log Logger.
      * @param initProps Kafka properties.
      * @param streamerCfg Streamer configuration.
      */
     public KafkaToIgniteMetadataUpdater(
-        IgniteEx ign,
+        BinaryContext ctx,
         IgniteLogger log,
         Properties initProps,
         KafkaToIgniteCdcStreamerConfiguration streamerCfg
     ) {
-        this.ign = ign;
+        this.ctx = ctx;
         this.kafkaReqTimeout = streamerCfg.getKafkaRequestTimeout();
         this.log = log.getLogger(KafkaToIgniteMetadataUpdater.class);
 
@@ -90,8 +88,6 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
 
     /** Polls all available records from metadata topic and applies it to Ignite. */
     public synchronized void updateMetadata() {
-        BinaryContext ctx = ((CacheObjectBinaryProcessorImpl)ign.context().cacheObjects()).binaryContext();
-
         while (true) {
             ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index d4517cd..144b8b8 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -40,6 +40,7 @@ import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.DataRegionConfiguration;
 import org.apache.ignite.configuration.DataStorageConfiguration;
@@ -49,8 +50,10 @@ import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
@@ -85,31 +88,37 @@ import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 /** */
 @RunWith(Parameterized.class)
 public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
-    /** Cache atomicity mode. */
+    /** Client type to connect to a destination cluster. */
     @Parameterized.Parameter
+    public ClientType clientType;
+
+    /** Cache atomicity mode. */
+    @Parameterized.Parameter(1)
     public CacheAtomicityMode atomicity;
 
     /** Cache replication mode. */
-    @Parameterized.Parameter(1)
+    @Parameterized.Parameter(2)
     public CacheMode mode;
 
     /** */
-    @Parameterized.Parameter(2)
+    @Parameterized.Parameter(3)
     public int backups;
 
     /** @return Test parameters. */
-    @Parameterized.Parameters(name = "atomicity={0}, mode={1}, backupCnt={2}")
+    @Parameterized.Parameters(name = "clientType={0}, atomicity={1}, mode={2}, backupCnt={3}")
     public static Collection<?> parameters() {
         List<Object[]> params = new ArrayList<>();
 
-        for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
-            for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
-                for (int backups = 0; backups < 2; backups++) {
-                    // backupCount ignored for REPLICATED caches.
-                    if (backups > 0 && mode == REPLICATED)
-                        continue;
+        for (ClientType clientType : ClientType.values()) {
+            for (CacheAtomicityMode atomicity : EnumSet.of(ATOMIC, TRANSACTIONAL)) {
+                for (CacheMode mode : EnumSet.of(PARTITIONED, REPLICATED)) {
+                    for (int backups = 0; backups < 2; backups++) {
+                        // backupCount ignored for REPLICATED caches.
+                        if (backups > 0 && mode == REPLICATED)
+                            continue;
 
-                    params.add(new Object[] {atomicity, mode, backups});
+                        params.add(new Object[] {clientType, atomicity, mode, backups});
+                    }
                 }
             }
         }
@@ -508,8 +517,10 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
 
     /** */
     private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
-        for (int i = 0; i < futs.size(); i++)
-            assertFalse("Fut " + i, futs.get(i).isDone());
+        for (int i = 0; i < futs.size(); i++) {
+            assertFalse("Fut " + i + ", error: " + X.getFullStackTrace(futs.get(i).error()),
+                futs.get(i).isDone());
+        }
 
         return res;
     }
@@ -544,6 +555,19 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
         return node.context().query().querySqlFields(new SqlFieldsQuery(sqlText).setArgs(args), true).getAll();
     }
 
+    /** @return Destination cluster host addresses. */
+    protected String[] hostAddresses(IgniteEx[] dest) {
+        String[] addrs = new String[dest.length];
+
+        for (int i = 0; i < dest.length; i++) {
+            ClusterNode node = dest[i].localNode();
+
+            addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
+        }
+
+        return addrs;
+    }
+
     /** */
     protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache);
 
@@ -637,4 +661,13 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
             this.orgId = orgId;
         }
     }
+
+    /** Client type to connect to a destination cluster. */
+    protected enum ClientType {
+        /** Client node. */
+        CLIENT_NODE,
+
+        /** Thin client. */
+        THIN_CLIENT;
+    }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 5a0a221..8273fc9 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -21,7 +21,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.function.Function;
+import org.apache.ignite.cdc.thin.IgniteToIgniteClientCdcStreamer;
+import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
@@ -35,7 +38,7 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], cache));
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, cache));
 
         return futs;
     }
@@ -45,35 +48,50 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
         List<IgniteInternalFuture<?>> futs = new ArrayList<>();
 
         for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], destCluster, ACTIVE_ACTIVE_CACHE));
 
         for (int i = 0; i < destCluster.length; i++)
-            futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+            futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], srcCluster, ACTIVE_ACTIVE_CACHE));
 
         return futs;
     }
 
     /** {@inheritDoc} */
     @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
-        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
-        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+        assertNotNull(longMetric.apply(AbstractIgniteCdcStreamer.LAST_EVT_TIME));
+        assertNotNull(longMetric.apply(AbstractIgniteCdcStreamer.EVTS_CNT));
     }
 
     /**
      * @param srcCfg Ignite source node configuration.
      * @param destCfg Ignite destination cluster configuration.
+     * @param dest Ignite destination cluster.
      * @param cache Cache name to stream to kafka.
      * @return Future for Change Data Capture application.
      */
-    protected IgniteInternalFuture<?> igniteToIgnite(IgniteConfiguration srcCfg, IgniteConfiguration destCfg, String cache) {
+    protected IgniteInternalFuture<?> igniteToIgnite(
+        IgniteConfiguration srcCfg,
+        IgniteConfiguration destCfg,
+        IgniteEx[] dest,
+        String cache
+    ) {
         return runAsync(() -> {
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
-            cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer()
-                .setDestinationIgniteConfiguration(destCfg)
-                .setMaxBatchSize(KEYS_CNT)
-                .setCaches(Collections.singleton(cache)));
+            AbstractIgniteCdcStreamer streamer;
 
+            if (clientType == ClientType.THIN_CLIENT) {
+                streamer = new IgniteToIgniteClientCdcStreamer()
+                    .setDestinationClientConfiguration(new ClientConfiguration()
+                        .setAddresses(hostAddresses(dest)));
+            }
+            else
+                streamer = new IgniteToIgniteCdcStreamer().setDestinationIgniteConfiguration(destCfg);
+
+            streamer.setMaxBatchSize(KEYS_CNT);
+            streamer.setCaches(Collections.singleton(cache));
+
+            cdcCfg.setConsumer(streamer);
             cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
 
             CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
index f182e77..43e7db6 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
@@ -20,7 +20,6 @@ package org.apache.ignite.cdc;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
 import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
 import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
-import org.apache.ignite.cdc.thin.CdcIgniteToIgniteClientReplicationTest;
 import org.junit.runner.RunWith;
 import org.junit.runners.Suite;
 
@@ -35,8 +34,7 @@ import org.junit.runners.Suite;
     KafkaToIgniteLoaderTest.class,
     CdcKafkaReplicationTest.class,
     CdcKafkaReplicationAppsTest.class,
-    ConflictResolverRestartTest.class,
-    CdcIgniteToIgniteClientReplicationTest.class
+    ConflictResolverRestartTest.class
 })
 public class IgniteCdcTestSuite {
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 1d88321..27d900d 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -22,9 +22,12 @@ import java.io.FileOutputStream;
 import java.io.PrintWriter;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.stream.Collectors;
 import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
 
@@ -77,6 +80,9 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
     /** */
     public static final String PROPS_PATH = "PROPS_PATH";
 
+    /** */
+    public static final String HOST_ADDRESSES = "HOST_ADDRESSES";
+
     /** */
     private String kafkaPropsPath = null;
 
@@ -127,16 +133,32 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
         String topic,
         String metadataTopic,
         IgniteConfiguration igniteCfg,
+        IgniteEx[] dest,
         int partFrom,
         int partTo
     ) {
         Map<String, String> params = new HashMap<>();
 
-        int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+        String cfg;
+
+        if (clientType == ClientType.THIN_CLIENT) {
+            cfg = "/replication/kafka-to-ignite-client.xml";
+
+            String addresses = Arrays.stream(hostAddresses(dest)).map(addr -> "<value>" + addr + "</value>")
+                .collect(Collectors.joining());
+
+            params.put(HOST_ADDRESSES, addresses);
+        }
+        else {
+            cfg = "/replication/kafka-to-ignite.xml";
+
+            int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+
+            params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+            params.put(DISCO_PORT, Integer.toString(discoPort));
+            params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
+        }
 
-        params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
-        params.put(DISCO_PORT, Integer.toString(discoPort));
-        params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
         params.put(REPLICATED_CACHE, cacheName);
         params.put(TOPIC, topic);
         params.put(METADATA_TOPIC, metadataTopic);
@@ -147,7 +169,7 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
         params.put(KAFKA_REQ_TIMEOUT, Long.toString(DFLT_KAFKA_REQ_TIMEOUT));
 
         return runAsync(
-            () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
+            () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig(cfg, params)})
         );
     }
 
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index fed0678..cbc2f94 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -25,6 +25,7 @@ import java.util.function.Function;
 import org.apache.ignite.cdc.AbstractReplicationTest;
 import org.apache.ignite.cdc.CdcConfiguration;
 import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
+import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
@@ -113,6 +114,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
                 cache,
                 SRC_DEST_META_TOPIC,
                 destClusterCliCfg[i],
+                destCluster,
                 i * (DFLT_PARTS / 2),
                 (i + 1) * (DFLT_PARTS / 2)
             ));
@@ -136,6 +138,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
             SRC_DEST_TOPIC,
             SRC_DEST_META_TOPIC,
             destClusterCliCfg[0],
+            destCluster,
             0,
             DFLT_PARTS
         ));
@@ -145,6 +148,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
             DEST_SRC_TOPIC,
             DEST_SRC_META_TOPIC,
             srcClusterCliCfg[0],
+            srcCluster,
             0,
             DFLT_PARTS
         ));
@@ -199,6 +203,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
     /**
      * @param cacheName Cache name.
      * @param igniteCfg Ignite configuration.
+     * @param dest Destination Ignite cluster.
      * @return Future for runed {@link KafkaToIgniteCdcStreamer}.
      */
     protected IgniteInternalFuture<?> kafkaToIgnite(
@@ -206,6 +211,7 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
         String topic,
         String metadataTopic,
         IgniteConfiguration igniteCfg,
+        IgniteEx[] dest,
         int fromPart,
         int toPart
     ) {
@@ -220,7 +226,15 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
         cfg.setMetadataTopic(metadataTopic);
         cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
 
-        return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
+        if (clientType == ClientType.THIN_CLIENT) {
+            ClientConfiguration clientCfg = new ClientConfiguration();
+
+            clientCfg.setAddresses(hostAddresses(dest));
+
+            return runAsync(new KafkaToIgniteClientCdcStreamer(clientCfg, kafkaProperties(), cfg));
+        }
+        else
+            return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
     }
 
     /** */
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
index 4fd818e..cb2d385 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -49,6 +49,35 @@ public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
         assertNotNull(streamer);
     }
 
+    /** */
+    @Test
+    public void testLoadIgniteClientConfig() throws Exception {
+        assertThrows(
+            null,
+            () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-double-client-cfg.xml"),
+            IgniteCheckedException.class,
+            "Exact 1 ClientConfiguration should be defined. Found 2"
+        );
+
+        assertThrows(
+            null,
+            () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-without-kafka-properties.xml"),
+            IgniteCheckedException.class,
+            "Spring bean with provided name doesn't exist"
+        );
+
+        assertThrows(
+            null,
+            () -> loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml"),
+            IgniteCheckedException.class,
+            "Either IgniteConfiguration or ClientConfiguration should be defined."
+        );
+
+        KafkaToIgniteClientCdcStreamer streamer = loadKafkaToIgniteStreamer("loader/thin/kafka-to-ignite-client-correct.xml");
+
+        assertNotNull(streamer);
+    }
+
     /** */
     @Test
     public void testInitSpringContextOnce() throws Exception {
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
deleted file mode 100644
index b810e18..0000000
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/thin/CdcIgniteToIgniteClientReplicationTest.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.cdc.thin;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.function.Function;
-import org.apache.ignite.cdc.AbstractReplicationTest;
-import org.apache.ignite.cdc.CdcConfiguration;
-import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.configuration.ClientConfiguration;
-import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.cdc.CdcMain;
-import org.apache.ignite.internal.processors.odbc.ClientListenerProcessor;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
-
-import static org.apache.ignite.testframework.GridTestUtils.runAsync;
-
-/**
- * {@link IgniteToIgniteClientCdcStreamer} test.
- */
-public class CdcIgniteToIgniteClientReplicationTest extends AbstractReplicationTest {
-    /** {@inheritDoc} */
-    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc(String cache) {
-        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
-        for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, cache));
-
-        return futs;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
-        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
-
-        for (int i = 0; i < srcCluster.length; i++)
-            futs.add(igniteToIgniteClient(srcCluster[i].configuration(), destCluster, ACTIVE_ACTIVE_CACHE));
-
-        for (int i = 0; i < destCluster.length; i++)
-            futs.add(igniteToIgniteClient(destCluster[i].configuration(), srcCluster, ACTIVE_ACTIVE_CACHE));
-
-        return futs;
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
-        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
-        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
-    }
-
-    /**
-     * @param srcCfg Ignite source node configuration.
-     * @param dest Destination cluster.
-     * @param cache Cache name to replicate.
-     * @return Future for Change Data Capture application.
-     */
-    private IgniteInternalFuture<?> igniteToIgniteClient(IgniteConfiguration srcCfg, IgniteEx[] dest, String cache) {
-        return runAsync(() -> {
-            ClientConfiguration clientCfg = new ClientConfiguration();
-
-            String[] addrs = new String[dest.length];
-
-            for (int i = 0; i < dest.length; i++) {
-                ClusterNode node = dest[i].localNode();
-
-                addrs[i] = F.first(node.addresses()) + ":" + node.attribute(ClientListenerProcessor.CLIENT_LISTENER_PORT);
-            }
-
-            clientCfg.setAddresses(addrs);
-
-            CdcConfiguration cdcCfg = new CdcConfiguration();
-
-            cdcCfg.setConsumer(new IgniteToIgniteClientCdcStreamer()
-                .setDestinationClientConfiguration(clientCfg)
-                .setCaches(Collections.singleton(cache))
-                .setMaxBatchSize(KEYS_CNT));
-
-            cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
-
-            CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
-
-            cdcs.add(cdc);
-
-            cdc.run();
-        });
-    }
-}
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
index 78a8809..b693424 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -44,4 +44,4 @@
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
index 545a338..f236f30 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
@@ -35,4 +35,4 @@
             </bean>
         </property>
     </bean>
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
index 892b177..586de47 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
@@ -35,4 +35,4 @@
     </bean>
 
     <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
similarity index 73%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
index 78a8809..caa8948 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-correct.xml
@@ -23,16 +23,11 @@
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
-    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="dataStorageConfiguration">
-            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
-                <property name="defaultDataRegionConfiguration">
-                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
-                        <property name="cdcEnabled" value="true" />
-                        <property name="persistenceEnabled" value="true" />
-                    </bean>
-                </property>
-            </bean>
+    <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
         </property>
     </bean>
 
@@ -44,4 +39,4 @@
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
similarity index 63%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
index 545a338..dde7ca1 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-double-client-cfg.xml
@@ -21,18 +21,13 @@
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-    <import resource="kafka-to-ignite-correct.xml" />
+    <import resource="kafka-to-ignite-client-correct.xml" />
 
-    <bean id="grid.cfg2" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="dataStorageConfiguration">
-            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
-                <property name="defaultDataRegionConfiguration">
-                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
-                        <property name="cdcEnabled" value="true" />
-                        <property name="persistenceEnabled" value="true" />
-                    </bean>
-                </property>
-            </bean>
+    <bean id="client2.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
         </property>
     </bean>
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
similarity index 90%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
index 78a8809..fa11ab7 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-with-ignite-cfg.xml
@@ -23,6 +23,14 @@
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
+        </property>
+    </bean>
+
     <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="dataStorageConfiguration">
             <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
@@ -44,4 +52,4 @@
     </bean>
 
     <util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
similarity index 67%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
copy to modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
index 892b177..85cdce5 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
+++ b/modules/cdc-ext/src/test/resources/loader/thin/kafka-to-ignite-client-without-kafka-properties.xml
@@ -21,18 +21,13 @@
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="dataStorageConfiguration">
-            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
-                <property name="defaultDataRegionConfiguration">
-                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
-                        <property name="cdcEnabled" value="true" />
-                        <property name="persistenceEnabled" value="true" />
-                    </bean>
-                </property>
-            </bean>
+    <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>
+                <value>127.0.0.1:10800</value>
+            </list>
         </property>
     </bean>
 
     <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
-</beans>
\ No newline at end of file
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
similarity index 61%
copy from modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
copy to modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
index 78a8809..a676ef8 100644
--- a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite-client.xml
@@ -23,25 +23,27 @@
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
-    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="dataStorageConfiguration">
-            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
-                <property name="defaultDataRegionConfiguration">
-                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
-                        <property name="cdcEnabled" value="true" />
-                        <property name="persistenceEnabled" value="true" />
-                    </bean>
-                </property>
-            </bean>
+    <bean id="client.cfg" class="org.apache.ignite.configuration.ClientConfiguration">
+        <property name="addresses">
+            <list>{HOST_ADDRESSES}</list>
         </property>
     </bean>
 
     <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
-        <property name="topic" value="ignite" />
-        <property name="metadataTopic" value="ignite-metadata" />
-        <property name="kafkaPartsFrom" value="0" />
-        <property name="kafkaPartsTo" value="16" />
+        <property name="caches">
+            <util:list>
+                <bean class="java.lang.String">
+                    <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+                </bean>
+            </util:list>
+        </property>
+        <property name="topic" value="{TOPIC}" />
+        <property name="metadataTopic" value="{METADATA_TOPIC}" />
+        <property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
+        <property name="kafkaPartsTo" value="{PARTS_TO}"/>
+        <property name="threadCount" value="{THREAD_CNT}"/>
+        <property name="kafkaRequestTimeout" value="{KAFKA_REQ_TIMEOUT}"/>
     </bean>
 
-    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
-</beans>
\ No newline at end of file
+    <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>