You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/06/30 16:13:32 UTC

[ignite-extensions] branch master updated: IGNITE-14362: CDC Ignite to Kafka extension (#49)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new c478ce1  IGNITE-14362: CDC Ignite to Kafka extension (#49)
c478ce1 is described below

commit c478ce1c80d0a24f052a2ce26c407987cc046519
Author: Nikolay <ni...@apache.org>
AuthorDate: Wed Jun 30 19:13:26 2021 +0300

    IGNITE-14362: CDC Ignite to Kafka extension (#49)
---
 .gitignore                                         |   1 +
 modules/cdc-ext/assembly/cdc-ext.xml               |  49 ++++
 modules/cdc-ext/bin/kafka-to-ignite.sh             |  27 ++
 .../modules/core/src/test/config/log4j-test.xml    |  57 ++++
 .../modules/core/src/test/config/tests.properties  |  16 ++
 modules/cdc-ext/pom.xml                            | 157 ++++++++++
 .../org/apache/ignite/cdc/CdcEventsApplier.java    | 169 +++++++++++
 .../ignite/cdc/IgniteToIgniteCdcStreamer.java      | 128 +++++++++
 .../CacheConflictResolutionManagerImpl.java        |  89 ++++++
 ...VersionConflictResolverCachePluginProvider.java |  97 +++++++
 .../CacheVersionConflictResolverImpl.java          | 152 ++++++++++
 ...CacheVersionConflictResolverPluginProvider.java | 160 +++++++++++
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 205 +++++++++++++
 .../ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java | 256 +++++++++++++++++
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java | 251 ++++++++++++++++
 .../KafkaToIgniteCdcStreamerConfiguration.java     | 124 ++++++++
 .../cdc/kafka/KafkaToIgniteCommandLineStartup.java | 110 +++++++
 .../ignite/cdc/kafka/KafkaToIgniteLoader.java      |  75 +++++
 .../apache/ignite/cdc/AbstractReplicationTest.java | 319 +++++++++++++++++++++
 .../ignite/cdc/CacheConflictOperationsTest.java    | 317 ++++++++++++++++++++
 .../cdc/CacheConflictOperationsWithFieldTest.java  |  26 ++
 .../cdc/CdcIgniteToIgniteReplicationTest.java      |  69 +++++
 .../ignite/cdc/ConflictResolvableTestData.java     |  69 +++++
 .../org/apache/ignite/cdc/IgniteCdcTestSuite.java  |  39 +++
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     | 164 +++++++++++
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  | 174 +++++++++++
 .../ignite/cdc/kafka/KafkaToIgniteLoaderTest.java  |  50 ++++
 .../resources/loader/kafka-to-ignite-correct.xml   |  42 +++
 .../loader/kafka-to-ignite-double-ignite-cfg.xml   |  38 +++
 .../kafka-to-ignite-without-kafka-properties.xml   |  38 +++
 .../src/test/resources/loader/kafka.properties     |   4 +
 .../test/resources/replication/ignite-to-kafka.xml |  64 +++++
 .../test/resources/replication/kafka-to-ignite.xml |  61 ++++
 modules/performance-statistics-ext/pom.xml         |   1 -
 parent/pom.xml                                     |   5 +
 pom.xml                                            |   1 +
 36 files changed, 3603 insertions(+), 1 deletion(-)

diff --git a/.gitignore b/.gitignore
index 00dcb00..59df72e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -25,6 +25,7 @@ buildNumber.properties
 .metadata
 bin/
 !/modules/performance-statistics-ext/bin
+!/modules/cdc-ext/bin
 tmp/
 *.tmp
 *.bak
diff --git a/modules/cdc-ext/assembly/cdc-ext.xml b/modules/cdc-ext/assembly/cdc-ext.xml
new file mode 100644
index 0000000..baaee78
--- /dev/null
+++ b/modules/cdc-ext/assembly/cdc-ext.xml
@@ -0,0 +1,49 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
+          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+          xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2
+          http://maven.apache.org/xsd/assembly-1.1.2.xsd">
+    <id>cdc-ext</id>
+
+    <includeBaseDirectory>false</includeBaseDirectory>
+
+    <formats>
+        <format>zip</format>
+    </formats>
+
+    <fileSets>
+        <fileSet>
+            <directory>${project.build.directory}</directory>
+            <outputDirectory>/libs/optional/ignite-cdc-ext</outputDirectory>
+            <includes>
+                <include>${project.build.finalName}.${project.packaging}</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.build.directory}/libs</directory>
+            <outputDirectory>/libs/optional/ignite-cdc-ext</outputDirectory>
+        </fileSet>
+        <fileSet>
+            <directory>${basedir}/bin</directory>
+            <outputDirectory>/bin</outputDirectory>
+        </fileSet>
+    </fileSets>
+</assembly>
diff --git a/modules/cdc-ext/bin/kafka-to-ignite.sh b/modules/cdc-ext/bin/kafka-to-ignite.sh
new file mode 100755
index 0000000..068a229
--- /dev/null
+++ b/modules/cdc-ext/bin/kafka-to-ignite.sh
@@ -0,0 +1,27 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+export MAIN_CLASS="org.apache.ignite.cdc.kafka.KafkaToIgniteCommandLineStartup"
+
+if [ "${IGNITE_HOME:-}" = "" ];
+    then IGNITE_HOME_TMP="$(dirname "$(cd "$(dirname "$0")"; "pwd")")";
+    else IGNITE_HOME_TMP=${IGNITE_HOME};
+fi
+
+${IGNITE_HOME_TMP}/bin/ignite.sh "$@"
diff --git a/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml b/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml
new file mode 100644
index 0000000..cc2146e
--- /dev/null
+++ b/modules/cdc-ext/modules/core/src/test/config/log4j-test.xml
@@ -0,0 +1,57 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!DOCTYPE log4j:configuration PUBLIC "-//APACHE//DTD LOG4J 1.2//EN"
+    "http://logging.apache.org/log4j/1.2/apidocs/org/apache/log4j/xml/doc-files/log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <param name="Threshold" value="DEBUG"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+
+        <filter class="org.apache.log4j.varia.LevelRangeFilter">
+            <param name="levelMin" value="DEBUG"/>
+            <param name="levelMax" value="INFO"/>
+        </filter>
+    </appender>
+
+    <appender name="CONSOLE_ERR" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.err"/>
+
+        <param name="Threshold" value="WARN"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <param name="ConversionPattern" value="[%d{ISO8601}][%-5p][%t][%c{1}] %m%n"/>
+        </layout>
+    </appender>
+
+    <category name="org">
+        <level value="INFO"/>
+    </category>
+
+    <root>
+        <level value="INFO"/>
+
+        <appender-ref ref="CONSOLE"/>
+        <appender-ref ref="CONSOLE_ERR"/>
+    </root>
+</log4j:configuration>
diff --git a/modules/cdc-ext/modules/core/src/test/config/tests.properties b/modules/cdc-ext/modules/core/src/test/config/tests.properties
new file mode 100644
index 0000000..228ed16
--- /dev/null
+++ b/modules/cdc-ext/modules/core/src/test/config/tests.properties
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file
diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
new file mode 100644
index 0000000..f85b1b8
--- /dev/null
+++ b/modules/cdc-ext/pom.xml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!--
+    POM file.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.ignite</groupId>
+        <artifactId>ignite-extensions-parent</artifactId>
+        <version>1</version>
+        <relativePath>../../parent</relativePath>
+    </parent>
+
+    <properties>
+        <kafka.version>2.7.0</kafka.version>
+        <test.containers.version>1.15.1</test.containers.version>
+        <slf4j.version>1.7.30</slf4j.version>
+    </properties>
+
+    <artifactId>ignite-cdc-ext</artifactId>
+    <version>1.0.0-SNAPSHOT</version>
+    <url>http://ignite.apache.org</url>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-log4j</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-spring</artifactId>
+            <version>${ignite.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <version>${ignite.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.12</artifactId>
+            <version>${kafka.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.12</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams</artifactId>
+            <version>${kafka.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-streams</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${kafka.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <version>${slf4j.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-dependency-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>copy-dependencies</goal>
+                        </goals>
+                        <configuration>
+                            <outputDirectory>${project.build.directory}/libs</outputDirectory>
+                            <includeScope>compile</includeScope>
+                            <excludeArtifactIds>ignite-core,ignite-spring,ignite-shmem</excludeArtifactIds>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <id>cdc-ext</id>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                        <phase>package</phase>
+                        <configuration>
+                            <appendAssemblyId>false</appendAssemblyId>
+                            <descriptors>
+                                <descriptor>assembly/cdc-ext.xml</descriptor>
+                            </descriptors>
+                            <finalName>ignite-cdc-ext</finalName>
+                            <attach>false</attach>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
new file mode 100644
index 0000000..a8fe3ff
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BooleanSupplier;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+
+/**
+ * Contains logic to process {@link CdcEvent} and apply them to the provided by {@link #ignite()} cluster.
+ */
+public abstract class CdcEventsApplier {
+    /** Maximum batch size. */
+    private final int maxBatchSize;
+
+    /** Caches. */
+    private final Map<Integer, IgniteInternalCache<BinaryObject, BinaryObject>> ignCaches = new HashMap<>();
+
+    /** Update batch. */
+    private final Map<KeyCacheObject, GridCacheDrInfo> updBatch = new HashMap<>();
+
+    /** Remove batch. */
+    private final Map<KeyCacheObject, GridCacheVersion> rmvBatch = new HashMap<>();
+
+    /** */
+    private final BooleanSupplier hasUpdates = () -> !F.isEmpty(updBatch);
+
+    /** */
+    private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
+
+    /** */
+    protected final AtomicLong evtsApplied = new AtomicLong();
+
+    /**
+     * @param maxBatchSize Maximum batch size.
+     */
+    public CdcEventsApplier(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    /**
+     * @param evts Events to process.
+     * @throws IgniteCheckedException If failed.
+     */
+    protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+        IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
+
+        for (CdcEvent evt : evts) {
+            if (log().isDebugEnabled())
+                log().debug("Event received [key=" + evt.key() + ']');
+
+            IgniteInternalCache<BinaryObject, BinaryObject> cache = ignCaches.computeIfAbsent(evt.cacheId(), cacheId -> {
+                for (String cacheName : ignite().cacheNames()) {
+                    if (CU.cacheId(cacheName) == cacheId) {
+                        // IgniteEx#cachex(String) will return null if cache not initialized with regular Ignite#cache(String) call.
+                        ignite().cache(cacheName);
+
+                        return ignite().cachex(cacheName);
+                    }
+                }
+
+                throw new IllegalStateException("Cache with id not found [cacheId=" + cacheId + ']');
+            });
+
+            if (cache != currCache) {
+                applyIf(currCache, hasUpdates, hasRemoves);
+
+                currCache = cache;
+            }
+
+            CacheEntryVersion order = evt.version();
+
+            KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
+
+            if (evt.value() != null) {
+                applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
+
+                CacheObject val = new CacheObjectImpl(evt.value(), null);
+
+                updBatch.put(key, new GridCacheDrInfo(val,
+                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
+            }
+            else {
+                applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+
+                rmvBatch.put(key,
+                    new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
+            }
+
+            evtsApplied.incrementAndGet();
+        }
+
+        if (currCache != null)
+            applyIf(currCache, hasUpdates, hasRemoves);
+    }
+
+    /**
+     * Applies data from {@link #updBatch} or {@link #rmvBatch} to Ignite if required.
+     *
+     * @param cache Current cache.
+     * @param applyUpd Apply update batch flag supplier.
+     * @param applyRmv Apply remove batch flag supplier.
+     * @throws IgniteCheckedException In case of error.
+     */
+    private void applyIf(
+        IgniteInternalCache<BinaryObject, BinaryObject> cache,
+        BooleanSupplier applyUpd,
+        BooleanSupplier applyRmv
+    ) throws IgniteCheckedException {
+        if (applyUpd.getAsBoolean()) {
+            if (log().isDebugEnabled())
+                log().debug("Applying put batch [cache=" + cache.name() + ']');
+
+            cache.putAllConflict(updBatch);
+
+            updBatch.clear();
+        }
+
+        if (applyRmv.getAsBoolean()) {
+            if (log().isDebugEnabled())
+                log().debug("Applying remove batch [cache=" + cache.name() + ']');
+
+            cache.removeAllConflict(rmvBatch);
+
+            rmvBatch.clear();
+        }
+    }
+
+    /** @return {@code True} if update batch should be applied. */
+    private boolean isApplyBatch(Map<KeyCacheObject, ?> map, KeyCacheObject key) {
+        return map.size() >= maxBatchSize || map.containsKey(key);
+    }
+
+    /** @return Ignite instance. */
+    protected abstract IgniteEx ignite();
+
+    /** @return Logger. */
+    protected abstract IgniteLogger log();
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
new file mode 100644
index 0000000..b8840bb
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Change Data Consumer that streams all data changes to provided {@link #dest} Ignite cluster.
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka unavailability or network issues.
+ *
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see CdcMain
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
+    /** Destination cluster client configuration. */
+    private final IgniteConfiguration destIgniteCfg;
+
+    /** Handle only primary entry flag. */
+    private final boolean onlyPrimary;
+
+    /** Destination Ignite cluster client */
+    private IgniteEx dest;
+
+    /** Logger. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Cache IDs. */
+    private final Set<Integer> cachesIds;
+
+    /**
+     * @param destIgniteCfg Configuration of the destination Ignite node.
+     * @param onlyPrimary Only primary flag.
+     * @param caches Cache names.
+     * @param maxBatchSize Maximum batch size.
+     */
+    public IgniteToIgniteCdcStreamer(IgniteConfiguration destIgniteCfg, boolean onlyPrimary, Set<String> caches, int maxBatchSize) {
+        super(maxBatchSize);
+
+        this.destIgniteCfg = destIgniteCfg;
+        this.onlyPrimary = onlyPrimary;
+
+        cachesIds = caches.stream()
+            .mapToInt(CU::cacheId)
+            .boxed()
+            .collect(Collectors.toSet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        if (log.isInfoEnabled())
+            log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
+
+        dest = (IgniteEx)Ignition.start(destIgniteCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+        try {
+            apply(() -> F.iterator(
+                evts,
+                F.identity(),
+                true,
+                evt -> !onlyPrimary || evt.primary(),
+                evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
+                evt -> evt.version().otherClusterVersion() == null));
+
+            if (log.isInfoEnabled())
+                log.info("Events applied [evtsApplied=" + evtsApplied.get() + ']');
+
+            return true;
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        dest.close();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteEx ignite() {
+        return dest;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteLogger log() {
+        return log;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
new file mode 100644
index 0000000..5227c56
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheConflictResolutionManagerImpl.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.lang.IgniteFuture;
+
+/**
+ * Intermediate component to provide {@link CacheVersionConflictResolverImpl} for specific cache.
+ *
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheConflictResolutionManagerImpl<K, V> implements CacheConflictResolutionManager<K, V> {
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of conflicting changes.
+     * Note, values of this field must implement {@link Comparable}.
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private final String conflictResolveField;
+
+    /** Grid cache context. */
+    private GridCacheContext<K, V> cctx;
+
+    /**
+     * @param conflictResolveField Field to resolve conflicts.
+     */
+    public CacheConflictResolutionManagerImpl(String conflictResolveField) {
+        this.conflictResolveField = conflictResolveField;
+    }
+
+    /** {@inheritDoc} */
+    @Override public CacheVersionConflictResolver conflictResolver() {
+        return new CacheVersionConflictResolverImpl(
+            cctx.versions().dataCenterId(),
+            conflictResolveField,
+            cctx.logger(CacheVersionConflictResolverImpl.class)
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(GridCacheContext<K, V> cctx) {
+        this.cctx = cctx;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel, boolean destroy) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDisconnected(IgniteFuture reconnectFut) {
+        // No-op.
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
new file mode 100644
index 0000000..5d152f9
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverCachePluginProvider.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import javax.cache.Cache;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.plugin.CachePluginConfiguration;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Intermediate component to provide {@link CacheConflictResolutionManagerImpl} for specific cache.
+ *
+ * @see CacheConflictResolutionManagerImpl
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheVersionConflictResolverCachePluginProvider<K, V, C extends CachePluginConfiguration<K, V>>
+    implements CachePluginProvider<C> {
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of conflicting changes.
+     * Note, values of this field must implement {@link Comparable}.
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private final String conflictResolveField;
+
+    /**
+     * @param conflictResolveField Field to resolve conflicts.
+     */
+    public CacheVersionConflictResolverCachePluginProvider(String conflictResolveField) {
+        this.conflictResolveField = conflictResolveField;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Nullable @Override public <T> T createComponent(Class<T> cls) {
+        if (cls.equals(CacheConflictResolutionManager.class))
+            return (T)new CacheConflictResolutionManagerImpl<>(conflictResolveField);
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validate() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateRemote(CacheConfiguration locCfg, CacheConfiguration rmtCfg, ClusterNode rmtNode) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable public <T, K2, V2> T unwrapCacheEntry(Cache.Entry<K2, V2> entry, Class<T> cls) {
+        return null;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
new file mode 100644
index 0000000..a6551ea
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverImpl.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
+import org.apache.ignite.internal.util.typedef.internal.U;
+
+/**
+ * This class implements simple conflict resolution algorithm.
+ * Algorithm decides which version of the entry should be used "new" or "old".
+ * The following steps performed:
+ * <ul>
+ *     <li>If entry is freshly created then new version used - {@link GridCacheVersionedEntryEx#isStartVersion()}.</li>
+ *     <li>If change made in this cluster then new version used - {@link GridCacheVersionedEntryEx#dataCenterId()}.</li>
+ *     <li>If cluster of new entry equal to cluster of old entry
+ *     then entry with the greater {@link GridCacheVersionedEntryEx#order()} used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of new entry greater then new version used.</li>
+ *     <li>If {@link #conflictResolveField} provided and field of old entry greater then old version used.</li>
+ *     <li>Conflict can't be resolved. Update ignored. Old version used.</li>
+ * </ul>
+ */
+public class CacheVersionConflictResolverImpl implements CacheVersionConflictResolver {
+    /**
+     * Cluster id.
+     */
+    private final byte clusterId;
+
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of conflicting changes.
+     * values of this field must implement {@link Comparable} interface.
+     * <pre><i>Note, value of this field used to resolve conflict for external updates only.</i>
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private final String conflictResolveField;
+
+    /** Logger. */
+    private final IgniteLogger log;
+
+    /** If {@code true} then conflict resolving with the value field enabled. */
+    private final boolean conflictResolveFieldEnabled;
+
+    /**
+     * @param clusterId Data center id.
+     * @param conflictResolveField Field to resolve conflicts.
+     * @param log Logger.
+     */
+    public CacheVersionConflictResolverImpl(byte clusterId, String conflictResolveField, IgniteLogger log) {
+        this.clusterId = clusterId;
+        this.conflictResolveField = conflictResolveField;
+        this.log = log;
+
+        conflictResolveFieldEnabled = conflictResolveField != null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K, V> GridCacheVersionConflictContext<K, V> resolve(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry,
+        boolean atomicVerComparator
+    ) {
+        GridCacheVersionConflictContext<K, V> res = new GridCacheVersionConflictContext<>(ctx, oldEntry, newEntry);
+
+        if (isUseNew(ctx, oldEntry, newEntry))
+            res.useNew();
+        else
+            res.useOld();
+
+        return res;
+    }
+
+    /**
+     * @param ctx Context.
+     * @param oldEntry Old entry.
+     * @param newEntry New entry.
+     * @param <K> Key type.
+     * @param <V> Key type.
+     * @return {@code True} is should use new entry.
+     */
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private <K, V> boolean isUseNew(
+        CacheObjectValueContext ctx,
+        GridCacheVersionedEntryEx<K, V> oldEntry,
+        GridCacheVersionedEntryEx<K, V> newEntry
+    ) {
+        if (newEntry.dataCenterId() == clusterId) // Update made on the local cluster always win.
+            return true;
+
+        if (oldEntry.isStartVersion()) // Entry absent (new entry).
+            return true;
+
+        if (oldEntry.dataCenterId() == newEntry.dataCenterId())
+            return newEntry.version().compareTo(oldEntry.version()) > 0; // New version from the same cluster.
+
+        if (conflictResolveFieldEnabled) {
+            Object oldVal = oldEntry.value(ctx);
+            Object newVal = newEntry.value(ctx);
+
+            if (oldVal != null && newVal != null) {
+                Comparable oldResolveField;
+                Comparable newResolveField;
+
+                try {
+                    if (oldVal instanceof BinaryObject) {
+                        oldResolveField = ((BinaryObject)oldVal).field(conflictResolveField);
+                        newResolveField = ((BinaryObject)newVal).field(conflictResolveField);
+                    }
+                    else {
+                        oldResolveField = U.field(oldVal, conflictResolveField);
+                        newResolveField = U.field(newVal, conflictResolveField);
+                    }
+
+                    return oldResolveField.compareTo(newResolveField) < 0;
+                }
+                catch (Exception e) {
+                    log.error(
+                        "Error while resolving replication conflict. [field=" + conflictResolveField + ", key=" + newEntry.key() + ']',
+                        e
+                    );
+                }
+            }
+        }
+
+        log.error("Conflict can't be resolved, update ignored [key=" + newEntry.key() + ", fromCluster=" + newEntry.dataCenterId()
+            + ", toCluster=" + oldEntry.dataCenterId() + ']');
+
+        // Ignoring update.
+        return false;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
new file mode 100644
index 0000000..bf702e1
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/conflictresolve/CacheVersionConflictResolverPluginProvider.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.conflictresolve;
+
+import java.io.Serializable;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.plugin.CachePluginContext;
+import org.apache.ignite.plugin.CachePluginProvider;
+import org.apache.ignite.plugin.ExtensionRegistry;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginConfiguration;
+import org.apache.ignite.plugin.PluginContext;
+import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.plugin.PluginValidationException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Plugin to enable {@link CacheVersionConflictResolverImpl} for provided caches.
+ *
+ * @see CacheVersionConflictResolverImpl
+ * @see CacheVersionConflictResolver
+ */
+public class CacheVersionConflictResolverPluginProvider<C extends PluginConfiguration> implements PluginProvider<C> {
+    /** Plugin context. */
+    private PluginContext ctx;
+
+    /** Cluster id. */
+    private byte clusterId;
+
+    /** Cache names. */
+    private Set<String> caches;
+
+    /**
+     * Field for conflict resolve.
+     * Value of this field will be used to compare two entries in case of conflicting changes.
+     * Note, values of this field must implement {@link Comparable}.
+     *
+     * @see CacheVersionConflictResolverImpl
+     */
+    private String conflictResolveField;
+
+    /** Cache plugin provider. */
+    private CachePluginProvider<?> provider;
+
+    /** */
+    public CacheVersionConflictResolverPluginProvider() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public String name() {
+        return "Cache version conflict resolver";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String version() {
+        return "1.0.0-SNAPSHOT";
+    }
+
+    /** {@inheritDoc} */
+    @Override public String copyright() {
+        return "Apache Software Foundation";
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initExtensions(PluginContext ctx, ExtensionRegistry registry) {
+        this.ctx = ctx;
+
+        this.provider = new CacheVersionConflictResolverCachePluginProvider<>(conflictResolveField);
+    }
+
+    /** {@inheritDoc} */
+    @Override public CachePluginProvider createCacheProvider(CachePluginContext ctx) {
+        if (caches.contains(ctx.igniteCacheConfiguration().getName()))
+            return provider;
+
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStart() {
+        IgniteEx ign = (IgniteEx)ctx.grid();
+
+        ign.context().cache().context().versions().dataCenterId(clusterId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgnitePlugin plugin() {
+        return new IgnitePlugin() { /* No-op. */ };
+    }
+
+    /** @param clusterId Data center ID. */
+    public void setClusterId(byte clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    /** @param caches Caches to replicate. */
+    public void setCaches(Set<String> caches) {
+        this.caches = caches;
+    }
+
+    /** @param conflictResolveField Field to resolve conflicts. */
+    public void setConflictResolveField(String conflictResolveField) {
+        this.conflictResolveField = conflictResolveField;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start(PluginContext ctx) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onIgniteStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public @Nullable Serializable provideDiscoveryData(UUID nodeId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void receiveDiscoveryData(UUID nodeId, Serializable data) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void validateNewNode(ClusterNode node) throws PluginValidationException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable public <T> T createComponent(PluginContext ctx, Class<T> cls) {
+        return null;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
new file mode 100644
index 0000000..3f97ee8
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.resources.LoggerResource;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+/**
+ * Change Data Consumer that streams all data changes to Kafka topic.
+ * {@link CdcEvent} spread across Kafka topic partitions with {@code {ignite_partition} % {kafka_topic_count}} formula.
+ * Consumer will just fail in case of any error during write. Fail of consumer will lead to the fail of {@code ignite-cdc} application.
+ * It expected that {@code ignite-cdc} will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka unavailability or network issues.
+ *
+ * If you have plans to apply written messages to the other Ignite cluster in active-active manner,
+ * e.g. concurrent updates of the same entry in other cluster is possible,
+ * please, be aware of {@link CacheVersionConflictResolverImpl} conflict resolved.
+ * Configuration of {@link CacheVersionConflictResolverImpl} can be found in {@link KafkaToIgniteCdcStreamer} documentation.
+ *
+ * @see CdcMain
+ * @see KafkaToIgniteCdcStreamer
+ * @see CacheVersionConflictResolverImpl
+ */
+public class IgniteToKafkaCdcStreamer implements CdcConsumer {
+    /** Default kafka request timeout in seconds. */
+    public static final int DFLT_REQ_TIMEOUT = 5;
+
+    /** Log. */
+    @LoggerResource
+    private IgniteLogger log;
+
+    /** Kafka producer to stream events. */
+    private KafkaProducer<Integer, byte[]> producer;
+
+    /** Handle only primary entry flag. */
+    private final boolean onlyPrimary;
+
+    /** Topic name. */
+    private final String topic;
+
+    /** Kafka topic partitions count. */
+    private final int kafkaParts;
+
+    /** Kafka properties. */
+    private final Properties kafkaProps;
+
+    /** Cache IDs. */
+    private final Set<Integer> cachesIds;
+
+    /** Max batch size. */
+    private final int maxBatchSize;
+
+    /** Count of sent messages.  */
+    private long msgCnt;
+
+    /**
+     * @param topic Topic name.
+     * @param kafkaParts Kafka partitions count.
+     * @param caches Cache names.
+     * @param maxBatchSize Maximum size of records concurrently sent to Kafka.
+     * @param onlyPrimary If {@code true} then stream only events from primaries.
+     * @param kafkaProps Kafka properties.
+     */
+    public IgniteToKafkaCdcStreamer(
+        String topic,
+        int kafkaParts,
+        Set<String> caches,
+        int maxBatchSize,
+        boolean onlyPrimary,
+        Properties kafkaProps
+    ) {
+        assert caches != null && !caches.isEmpty();
+
+        this.topic = topic;
+        this.kafkaParts = kafkaParts;
+        this.onlyPrimary = onlyPrimary;
+        this.kafkaProps = kafkaProps;
+        this.maxBatchSize = maxBatchSize;
+
+        cachesIds = caches.stream()
+            .mapToInt(CU::cacheId)
+            .boxed()
+            .collect(Collectors.toSet());
+
+        kafkaProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
+        kafkaProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> evts) {
+        List<Future<RecordMetadata>> futs = new ArrayList<>();
+
+        while (evts.hasNext() && futs.size() < maxBatchSize) {
+            CdcEvent evt = evts.next();
+
+            if (log.isDebugEnabled())
+                log.debug("Event received [evt=" + evt + ']');
+
+            if (onlyPrimary && !evt.primary()) {
+                if (log.isDebugEnabled())
+                    log.debug("Event skipped because of primary flag [evt=" + evt + ']');
+
+                continue;
+            }
+
+            if (evt.version().otherClusterVersion() != null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Event skipped because of version [evt=" + evt +
+                        ", otherClusterVersion=" + evt.version().otherClusterVersion() + ']');
+                }
+
+                continue;
+            }
+
+            if (!cachesIds.isEmpty() && !cachesIds.contains(evt.cacheId())) {
+                if (log.isDebugEnabled())
+                    log.debug("Event skipped because of cacheId [evt=" + evt + ']');
+
+                continue;
+            }
+
+            msgCnt++;
+
+            futs.add(producer.send(new ProducerRecord<>(
+                topic,
+                evt.partition() % kafkaParts,
+                evt.cacheId(),
+                IgniteUtils.toBytes(evt)
+            )));
+
+            if (log.isDebugEnabled())
+                log.debug("Event sent asynchronously [evt=" + evt + ']');
+        }
+
+        try {
+            for (Future<RecordMetadata> fut : futs)
+                fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+        }
+        catch (InterruptedException | ExecutionException | TimeoutException e) {
+            throw new RuntimeException(e);
+        }
+
+        if (log.isInfoEnabled())
+            log.info("Events processed [sentMessagesCount=" + msgCnt + ']');
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() {
+        try {
+            producer = new KafkaProducer<>(kafkaProps);
+
+            if (log.isInfoEnabled())
+                log.info("CDC Ignite To Kafka started [topic=" + topic + ", onlyPrimary=" + onlyPrimary + ", cacheIds=" + cachesIds + ']');
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        producer.close();
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
new file mode 100644
index 0000000..e62a6f4
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamer.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.Ignition;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.conflictresolve.CacheConflictResolutionManagerImpl;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridLoggerProxy;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+
+import static org.apache.ignite.internal.IgniteKernal.NL;
+import static org.apache.ignite.internal.IgniteKernal.SITE;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Main class of Kafka to Ignite application.
+ * This application is counterpart of {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * Application runs several {@link KafkaToIgniteCdcStreamerApplier} thread to read Kafka topic partitions
+ * and apply {@link CdcEvent} to Ignite.
+ * <p>
+ * Each applier receive even number of kafka topic partition to read.
+ * <p>
+ * In case of any error during read applier just fail. Fail of any applier will lead to the fail of whole application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ * <p>
+ * To resolve possible update conflicts (in case of concurrent update in source and destination Ignite clusters)
+ * real-world deployments should use some conflict resolver, for example {@link CacheVersionConflictResolverImpl}.
+ * Example of Ignite configuration with the conflict resolver:
+ * <pre>
+ * {@code
+ * CacheVersionConflictResolverCachePluginProvider conflictPlugin = new CacheVersionConflictResolverCachePluginProvider();
+ *
+ * conflictPlugin.setClusterId(clusterId); // Cluster id.
+ * conflictPlugin.setCaches(new HashSet<>(Arrays.asList("my-cache", "some-other-cache"))); // Caches to replicate.
+ *
+ * IgniteConfiguration cfg = ...;
+ *
+ * cfg.setPluginProviders(conflictPlugin);
+ * }
+ * </pre>
+ * Please, see {@link CacheConflictResolutionManagerImpl} for additional information.
+ *
+ * @see CdcMain
+ * @see IgniteToKafkaCdcStreamer
+ * @see CdcEvent
+ * @see KafkaToIgniteCdcStreamerApplier
+ * @see CacheConflictResolutionManagerImpl
+ */
+public class KafkaToIgniteCdcStreamer implements Runnable {
+    /** Ignite configuration. */
+    private final IgniteConfiguration igniteCfg;
+
+    /** Kafka consumer properties. */
+    private final Properties kafkaProps;
+
+    /** Streamer configuration. */
+    private final KafkaToIgniteCdcStreamerConfiguration streamerCfg;
+
+    /** Runners to run {@link KafkaToIgniteCdcStreamerApplier} instances. */
+    private final Thread[] runners;
+
+    /** Appliers. */
+    private final List<KafkaToIgniteCdcStreamerApplier> appliers;
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param kafkaProps Kafka properties.
+     * @param streamerCfg Streamer configuration.
+     */
+    public KafkaToIgniteCdcStreamer(
+        IgniteConfiguration igniteCfg,
+        Properties kafkaProps,
+        KafkaToIgniteCdcStreamerConfiguration streamerCfg
+    ) {
+        this.igniteCfg = igniteCfg;
+        this.kafkaProps = kafkaProps;
+        this.streamerCfg = streamerCfg;
+
+        appliers = new ArrayList<>(streamerCfg.getThreadCount());
+        runners = new Thread[streamerCfg.getThreadCount()];
+
+        if (!kafkaProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG))
+            throw new IllegalArgumentException("Kafka properties don't contains " + ConsumerConfig.GROUP_ID_CONFIG);
+
+        kafkaProps.put(KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class.getName());
+        kafkaProps.put(VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        try {
+            runx();
+        }
+        catch (Exception e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** */
+    private void runx() throws Exception {
+        U.initWorkDir(igniteCfg);
+
+        IgniteLogger log = U.initLogger(igniteCfg, "kafka-ignite-streamer");
+
+        igniteCfg.setGridLogger(log);
+
+        ackAsciiLogo(log);
+
+        try (IgniteEx ign = (IgniteEx)Ignition.start(igniteCfg)) {
+            AtomicBoolean stopped = new AtomicBoolean();
+
+            Set<Integer> caches = null;
+
+            if (!F.isEmpty(streamerCfg.getCaches())) {
+                caches = streamerCfg.getCaches().stream()
+                    .peek(cache -> Objects.requireNonNull(ign.cache(cache), cache + " not exists!"))
+                    .map(CU::cacheId).collect(Collectors.toSet());
+            }
+
+            int kafkaPartsFrom = streamerCfg.getKafkaPartsFrom();
+            int kafkaParts = streamerCfg.getKafkaPartsTo() - kafkaPartsFrom;
+            int threadCnt = streamerCfg.getThreadCount();
+
+            assert kafkaParts >= threadCnt
+                : "Threads count bigger then kafka partitions count [kafkaParts=" + kafkaParts + ",threadCount=" + threadCnt + ']';
+
+            int partPerApplier = kafkaParts / threadCnt;
+
+            for (int i = 0; i < threadCnt; i++) {
+                int from = i * partPerApplier;
+                int to = (i + 1) * partPerApplier;
+
+                if (i == threadCnt - 1)
+                    to = kafkaParts;
+
+                KafkaToIgniteCdcStreamerApplier applier = new KafkaToIgniteCdcStreamerApplier(
+                    ign,
+                    log,
+                    kafkaProps,
+                    streamerCfg.getTopic(),
+                    kafkaPartsFrom + from,
+                    kafkaPartsFrom + to,
+                    caches,
+                    streamerCfg.getMaxBatchSize(),
+                    stopped
+                );
+
+                appliers.add(applier);
+
+                runners[i] = new Thread(applier, "applier-thread-" + i);
+
+                runners[i].start();
+            }
+
+            try {
+                for (int i = 0; i < threadCnt; i ++)
+                    runners[i].join();
+            }
+            catch (InterruptedException e) {
+                stopped.set(true);
+
+                appliers.forEach(U::closeQuiet);
+
+                log.warning("Kafka to Ignite streamer interrupted", e);
+            }
+        }
+    }
+
+    /** */
+    private void ackAsciiLogo(IgniteLogger log) {
+        String ver = "ver. " + ACK_VER_STR;
+
+        if (log.isInfoEnabled()) {
+            log.info(NL + NL +
+                ">>>    __ _____   ______ _____     __________    __________  ________________" + NL +
+                ">>>   / //_/ _ | / __/ //_/ _ |   /_  __/ __ \\  /  _/ ___/ |/ /  _/_  __/ __/" + NL +
+                ">>>  / ,< / __ |/ _// ,< / __ |    / / / /_/ / _/ // (_ /    // /  / / / _/  " + NL +
+                ">>> /_/|_/_/ |_/_/ /_/|_/_/ |_|   /_/  \\____/ /___/\\___/_/|_/___/ /_/ /___/  " + NL +
+                ">>> " + NL +
+                ">>> " + NL +
+                ">>> " + ver + NL +
+                ">>> " + COPYRIGHT + NL +
+                ">>> " + NL +
+                ">>> Ignite documentation: " + "http://" + SITE + NL +
+                ">>> Kafka topic: " + streamerCfg.getTopic() + NL +
+                ">>> Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo() + NL
+            );
+        }
+
+        if (log.isQuiet()) {
+            U.quiet(false,
+                "   __ _____   ______ _____     __________    __________  ________________",
+                "  / //_/ _ | / __/ //_/ _ |   /_  __/ __ \\  /  _/ ___/ |/ /  _/_  __/ __/",
+                " / ,< / __ |/ _// ,< / __ |    / / / /_/ / _/ // (_ /    // /  / / / _/  ",
+                "/_/|_/_/ |_/_/ /_/|_/_/ |_|   /_/  \\____/ /___/\\___/_/|_/___/ /_/ /___/  ",
+                "",
+                ver,
+                COPYRIGHT,
+                "",
+                "Ignite documentation: " + "http://" + SITE,
+                "Kafka topic: " + streamerCfg.getTopic(),
+                "Kafka partitions: " + streamerCfg.getKafkaPartsFrom() + "-" + streamerCfg.getKafkaPartsTo(),
+                "",
+                "Quiet mode.");
+
+            String fileName = log.fileName();
+
+            if (fileName != null)
+                U.quiet(false, "  ^-- Logging to file '" + fileName + '\'');
+
+            if (log instanceof GridLoggerProxy)
+                U.quiet(false, "  ^-- Logging by '" + ((GridLoggerProxy)log).getLoggerInfo() + '\'');
+
+            U.quiet(false,
+                "  ^-- To see **FULL** console log here add -DIGNITE_QUIET=false or \"-v\" to kafka-to-ignite.{sh|bat}",
+                "");
+        }
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
new file mode 100644
index 0000000..fbac6e0
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.CdcEventsApplier;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+
+/**
+ * Thread that polls message from the Kafka topic partitions and applies those messages to the Ignite caches.
+ * It expected that messages was written to the Kafka by the {@link IgniteToKafkaCdcStreamer} Change Data Capture consumer.
+ * <p>
+ * Each {@code Applier} receive set of Kafka topic partitions to read and caches to process.
+ * Applier creates consumer per partition because Kafka consumer reads not fair,
+ * consumer reads messages from specific partition while there is new messages in specific partition.
+ * See <a href=
+ * "https://cwiki.apache.org/confluence/display/KAFKA/KIP-387%3A+Fair+Message+Consumption+Across+Partitions+in+KafkaConsumer">KIP-387</a>
+ * and <a href="https://issues.apache.org/jira/browse/KAFKA-3932">KAFKA-3932</a> for further information.
+ * All consumers should belongs to the same consumer-group to ensure consistent reading.
+ * Applier polls messages from each consumer in round-robin fashion.
+ * <p>
+ * Messages applied to Ignite using {@link IgniteInternalCache#putAllConflict(Map)}, {@link IgniteInternalCache#removeAllConflict(Map)}
+ * these methods allows to provide {@link GridCacheVersion} of the entry to the Ignite so in case update conflicts they can be resolved
+ * by the {@link CacheVersionConflictResolver}.
+ * <p>
+ * In case of any error during read applier just fail.
+ * Fail of any applier will lead to the fail of {@link KafkaToIgniteCdcStreamer} application.
+ * It expected that application will be configured for automatic restarts with the OS tool to failover temporary errors
+ * such as Kafka or Ignite unavailability.
+ *
+ * @see KafkaToIgniteCdcStreamer
+ * @see IgniteToKafkaCdcStreamer
+ * @see IgniteInternalCache#putAllConflict(Map)
+ * @see IgniteInternalCache#removeAllConflict(Map)
+ * @see CacheVersionConflictResolver
+ * @see GridCacheVersion
+ * @see CdcEvent
+ * @see CacheEntryVersion
+ */
+class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnable, AutoCloseable {
+    /** */
+    public static final int DFLT_REQ_TIMEOUT = 3;
+
+    /** Ignite instance. */
+    private final IgniteEx ign;
+
+    /** Log. */
+    private final IgniteLogger log;
+
+    /** Closed flag. Shared between all appliers. */
+    private final AtomicBoolean stopped;
+
+    /** Kafka properties. */
+    private final Properties kafkaProps;
+
+    /** Topic to read. */
+    private final String topic;
+
+    /** Lower kafka partition (inclusive). */
+    private final int kafkaPartFrom;
+
+    /** Higher kafka partition (exclusive). */
+    private final int kafkaPartTo;
+
+    /** Caches ids to read. */
+    private final Set<Integer> caches;
+
+    /** Consumers. */
+    private final List<KafkaConsumer<Integer, byte[]>> cnsmrs = new ArrayList<>();
+
+    /** */
+    private final AtomicLong rcvdEvts = new AtomicLong();
+
+    /**
+     * @param ign Ignite instance.
+     * @param log Logger.
+     * @param kafkaProps Kafka properties.
+     * @param topic Topic name.
+     * @param kafkaPartFrom Read from partition.
+     * @param kafkaPartTo Read to partition.
+     * @param caches Cache ids.
+     * @param maxBatchSize Maximum batch size.
+     * @param stopped Stopped flag.
+     */
+    public KafkaToIgniteCdcStreamerApplier(
+        IgniteEx ign,
+        IgniteLogger log,
+        Properties kafkaProps,
+        String topic,
+        int kafkaPartFrom,
+        int kafkaPartTo,
+        Set<Integer> caches,
+        int maxBatchSize,
+        AtomicBoolean stopped
+    ) {
+        super(maxBatchSize);
+
+        this.ign = ign;
+        this.kafkaProps = kafkaProps;
+        this.topic = topic;
+        this.kafkaPartFrom = kafkaPartFrom;
+        this.kafkaPartTo = kafkaPartTo;
+        this.caches = caches;
+        this.stopped = stopped;
+        this.log = log.getLogger(KafkaToIgniteCdcStreamerApplier.class);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void run() {
+        U.setCurrentIgniteName(ign.name());
+
+        try {
+            for (int kafkaPart = kafkaPartFrom; kafkaPart < kafkaPartTo; kafkaPart++) {
+                KafkaConsumer<Integer, byte[]> cnsmr = new KafkaConsumer<>(kafkaProps);
+
+                cnsmr.assign(Collections.singleton(new TopicPartition(topic, kafkaPart)));
+
+                cnsmrs.add(cnsmr);
+            }
+
+            Iterator<KafkaConsumer<Integer, byte[]>> cnsmrIter = Collections.emptyIterator();
+
+            while (!stopped.get()) {
+                if (!cnsmrIter.hasNext())
+                    cnsmrIter = cnsmrs.iterator();
+
+                poll(cnsmrIter.next());
+            }
+        }
+        catch (WakeupException e) {
+            if (!stopped.get())
+                log.error("Applier wakeup error!", e);
+        }
+        catch (Throwable e) {
+            log.error("Applier error!", e);
+
+            stopped.set(true);
+        }
+        finally {
+            for (KafkaConsumer<Integer, byte[]> consumer : cnsmrs) {
+                try {
+                    consumer.close(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+                }
+                catch (Exception e) {
+                    log.warning("Close error!", e);
+                }
+            }
+
+            cnsmrs.clear();
+        }
+
+        if (log.isInfoEnabled())
+            log.info(Thread.currentThread().getName() + " - stopped!");
+    }
+
+    /**
+     * Polls data from the specific consumer and applies it to the Ignite.
+     * @param cnsmr Data consumer.
+     */
+    private void poll(KafkaConsumer<Integer, byte[]> cnsmr) throws IgniteCheckedException {
+        ConsumerRecords<Integer, byte[]> recs = cnsmr.poll(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+
+        if (log.isDebugEnabled()) {
+            log.debug(
+                "Polled from consumer [assignments=" + cnsmr.assignment() + ",rcvdEvts=" + rcvdEvts.addAndGet(recs.count()) + ']'
+            );
+        }
+
+        apply(F.iterator(recs, this::deserialize, true, rec -> F.isEmpty(caches) || caches.contains(rec.key())));
+
+        cnsmr.commitSync(Duration.ofSeconds(DFLT_REQ_TIMEOUT));
+    }
+
+    /**
+     * @param rec Kafka record.
+     * @return CDC event.
+     */
+    private CdcEvent deserialize(ConsumerRecord<Integer, byte[]> rec) {
+        try (ObjectInputStream is = new ObjectInputStream(new ByteArrayInputStream(rec.value()))) {
+            return (CdcEvent)is.readObject();
+        }
+        catch (IOException | ClassNotFoundException e) {
+            throw new IgniteException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close() {
+        log.warning("Close applier!");
+
+        cnsmrs.forEach(KafkaConsumer::wakeup);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteEx ignite() {
+        return ign;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteLogger log() {
+        return log;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(KafkaToIgniteCdcStreamerApplier.class, this);
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
new file mode 100644
index 0000000..1e9026c
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerConfiguration.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+
+/**
+ * Configuration of {@link KafkaToIgniteCdcStreamer} application.
+ *
+ * @see KafkaToIgniteCdcStreamer
+ * @see KafkaToIgniteLoader
+ */
+public class KafkaToIgniteCdcStreamerConfiguration {
+    /** Default {@link #kafkaPartsTo} value. */
+    public static final int DFLT_PARTS = 16;
+
+    /** Default {@link #topic} value. */
+    public static final String DFLT_TOPIC = "ignite";
+
+    /** Default {@link #maxBatchSize} value. */
+    public static final int DFLT_MAX_BATCH_SIZE = 1024;
+
+    /** {@link KafkaToIgniteCdcStreamerApplier} thread count. */
+    private int threadCnt = DFLT_PARTS;
+
+    /** Topic name. */
+    private String topic = DFLT_TOPIC;
+
+    /** Kafka partitions lower bound (inclusive). */
+    private int kafkaPartsFrom = 0;
+
+    /** Kafka partitions higher bound (exclusive). */
+    private int kafkaPartsTo = DFLT_PARTS;
+
+    /**
+     * Maximum batch size to apply to Ignite.
+     *
+     * @see IgniteInternalCache#putAllConflict(Map)
+     * @see IgniteInternalCache#removeAllConflict(Map)
+     */
+    private int maxBatchSize = DFLT_MAX_BATCH_SIZE;
+
+    /**
+     * Cache names to process.
+     */
+    private Collection<String> caches;
+
+    /** */
+    public int getThreadCount() {
+        return threadCnt;
+    }
+
+    /** */
+    public void setThreadCount(int threadCnt) {
+        this.threadCnt = threadCnt;
+    }
+
+    /** */
+    public String getTopic() {
+        return topic;
+    }
+
+    /** */
+    public void setTopic(String topic) {
+        this.topic = topic;
+    }
+
+    /** */
+    public int getKafkaPartsFrom() {
+        return kafkaPartsFrom;
+    }
+
+    /** */
+    public void setKafkaPartsFrom(int kafkaPartsFrom) {
+        this.kafkaPartsFrom = kafkaPartsFrom;
+    }
+
+    /** */
+    public int getKafkaPartsTo() {
+        return kafkaPartsTo;
+    }
+
+    /** */
+    public void setKafkaPartsTo(int kafkaPartsTo) {
+        this.kafkaPartsTo = kafkaPartsTo;
+    }
+
+    /** */
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    /** */
+    public void setMaxBatchSize(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    /** */
+    public Collection<String> getCaches() {
+        return caches;
+    }
+
+    /** */
+    public void setCaches(Collection<String> caches) {
+        this.caches = caches;
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
new file mode 100644
index 0000000..c425664
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCommandLineStartup.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.util.typedef.X;
+import org.jetbrains.annotations.Nullable;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteLoader.KAFKA_PROPERTIES;
+import static org.apache.ignite.internal.IgniteVersionUtils.ACK_VER_STR;
+import static org.apache.ignite.internal.IgniteVersionUtils.COPYRIGHT;
+import static org.apache.ignite.startup.cmdline.CommandLineStartup.isHelp;
+
+/**
+ * This class defines command-line {@link KafkaToIgniteCdcStreamer} startup. This startup can be used to start Ignite
+ * {@link KafkaToIgniteCdcStreamer} application outside of any hosting environment from command line.
+ * This startup is a Java application with {@link #main(String[])} method that accepts command line arguments.
+ * It accepts on parameter which is Ignite Spring XML configuration file path.
+ * You can run this class from command line without parameters to get help message.
+ */
+public class KafkaToIgniteCommandLineStartup {
+    /** Quite log flag. */
+    private static final boolean QUITE = IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_QUIET);
+
+    /**
+     * Main entry point.
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) {
+        if (!QUITE) {
+            X.println("Kafka To Ignite Command Line Startup, ver. " + ACK_VER_STR);
+            X.println(COPYRIGHT);
+            X.println();
+        }
+
+        if (args.length > 1)
+            exit("Too many arguments.", true, -1);
+
+        if (args.length > 0 && isHelp(args[0]))
+            exit(null, true, 0);
+
+        if (args.length > 0 && args[0].isEmpty())
+            exit("Empty argument.", true, 1);
+
+        if (args.length > 0 && args[0].charAt(0) == '-')
+            exit("Invalid arguments: " + args[0], true, -1);
+
+        try {
+            KafkaToIgniteCdcStreamer streamer = KafkaToIgniteLoader.loadKafkaToIgniteStreamer(args[0]);
+
+            streamer.run();
+        }
+        catch (Throwable e) {
+            e.printStackTrace();
+
+            String note = "";
+
+            if (X.hasCause(e, ClassNotFoundException.class))
+                note = "\nNote! You may use 'USER_LIBS' environment variable to specify your classpath.";
+
+            exit("Failed to run app: " + e.getMessage() + note, false, -1);
+        }
+    }
+
+    /**
+     * Exists with optional error message, usage show and exit code.
+     *
+     * @param errMsg Optional error message.
+     * @param showUsage Whether or not to show usage information.
+     * @param exitCode Exit code.
+     */
+    private static void exit(@Nullable String errMsg, boolean showUsage, int exitCode) {
+        if (errMsg != null)
+            X.error(errMsg);
+
+        if (showUsage) {
+            X.error(
+                "Usage:",
+                "    kafka-to-ignite.{sh|bat} [?]|[path]",
+                "    Where:",
+                "    ?, /help, -help, - show this message.",
+                "    -v               - verbose mode (quiet by default).",
+                "    path            - path to Spring XML configuration file.",
+                "                      Path can be absolute or relative to IGNITE_HOME.",
+                " ",
+                "Spring file should contain bean definition of 'org.apache.ignite.configuration.IgniteConfiguration' " +
+                    "and 'org.apache.ignite.cdc.KafkaToIgniteCdcStreamerConfiguration' " +
+                    "and bean of class 'java.util.Properties' with '" + KAFKA_PROPERTIES + "' name " +
+                    "that contains properties to connect to Apache Kafka cluster. " +
+                    "Note that bean will be fetched by the type and its ID is not used.");
+        }
+
+        System.exit(exitCode);
+    }
+}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
new file mode 100644
index 0000000..adfc113
--- /dev/null
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoader.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.net.URL;
+import java.util.Collection;
+import java.util.Properties;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.processors.resource.GridSpringResourceContext;
+import org.apache.ignite.internal.util.spring.IgniteSpringHelper;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+
+import static org.apache.ignite.internal.IgniteComponentType.SPRING;
+
+/**
+ * Utility class to load {@link KafkaToIgniteCdcStreamer} from Spring XML configuration.
+ */
+public class KafkaToIgniteLoader {
+    /** Kafka properties bean name. */
+    public static final String KAFKA_PROPERTIES = "kafkaProperties";
+
+    /**
+     * Loads {@link KafkaToIgniteCdcStreamer} from XML configuration file.
+     * If load fails then error message wouldn't be null.
+     *
+     * @param springXmlPath Path to XML configuration file.
+     * @return {@code KafkaToIgniteCdcStreamer} instance.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static KafkaToIgniteCdcStreamer loadKafkaToIgniteStreamer(String springXmlPath) throws IgniteCheckedException {
+        URL cfgUrl = U.resolveSpringUrl(springXmlPath);
+
+        IgniteSpringHelper spring = SPRING.create(false);
+
+        IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgTuple =
+            spring.loadConfigurations(cfgUrl);
+
+        if (cfgTuple.get1().size() > 1) {
+            throw new IgniteCheckedException(
+                "Exact 1 IgniteConfiguration should be defined. Found " + cfgTuple.get1().size()
+            );
+        }
+
+        IgniteBiTuple<Collection<KafkaToIgniteCdcStreamerConfiguration>, ? extends GridSpringResourceContext> k2iCfg =
+            spring.loadConfigurations(cfgUrl, KafkaToIgniteCdcStreamerConfiguration.class);
+
+        if (k2iCfg.get1().size() > 1) {
+            throw new IgniteCheckedException(
+                "Exact 1 KafkaToIgniteCdcStreamerConfiguration configuration should be defined. " +
+                    "Found " + k2iCfg.get1().size()
+            );
+        }
+
+        Properties kafkaProps = spring.loadBean(cfgUrl, KAFKA_PROPERTIES);
+
+        return new KafkaToIgniteCdcStreamer(cfgTuple.get1().iterator().next(), kafkaProps, k2iCfg.get1().iterator().next());
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
new file mode 100644
index 0000000..fb980ed
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.IntStream;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/** */
+@RunWith(Parameterized.class)
+public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public int backupCnt;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0},backupCnt={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (int i = 0; i < 2; i++)
+                params.add(new Object[] {mode, i});
+
+        return params;
+    }
+
+    /** */
+    public static final String ACTIVE_PASSIVE_CACHE = "active-passive-cache";
+
+    /** */
+    public static final String ACTIVE_ACTIVE_CACHE = "active-active-cache";
+
+    /** */
+    public static final String IGNORED_CACHE = "ignored-cache";
+
+    /** */
+    public static final byte SRC_CLUSTER_ID = 1;
+
+    /** */
+    public static final byte DEST_CLUSTER_ID = 2;
+
+    /** */
+    private enum WaitDataMode {
+        /** */
+        EXISTS,
+
+        /** */
+        REMOVED
+    }
+
+    /** */
+    public static final int KEYS_CNT = 1000;
+
+    /** */
+    protected static IgniteEx[] srcCluster;
+
+    /** */
+    protected static IgniteEx[] destCluster;
+
+    /** */
+    protected static IgniteConfiguration[] srcClusterCliCfg;
+
+    /** */
+    protected static IgniteConfiguration[] destClusterCliCfg;
+
+    /** */
+    private int discoPort = TcpDiscoverySpi.DFLT_PORT;
+
+    /** */
+    private byte clusterId = SRC_CLUSTER_ID;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
+            .setDiscoverySpi(new TcpDiscoverySpi()
+                .setLocalPort(discoPort)
+                .setIpFinder(new TcpDiscoveryVmIpFinder() {{
+                    setAddresses(Collections.singleton("127.0.0.1:" + discoPort + ".." + (discoPort + DFLT_PORT_RANGE)));
+                }}));
+
+        if (!cfg.isClientMode()) {
+            CacheVersionConflictResolverPluginProvider<?> cfgPlugin = new CacheVersionConflictResolverPluginProvider<>();
+
+            cfgPlugin.setClusterId(clusterId);
+            cfgPlugin.setCaches(new HashSet<>(Collections.singletonList(ACTIVE_ACTIVE_CACHE)));
+            cfgPlugin.setConflictResolveField("reqId");
+
+            cfg.setPluginProviders(cfgPlugin);
+
+            cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new DataRegionConfiguration()
+                    .setPersistenceEnabled(true)));
+
+            cfg.getDataStorageConfiguration()
+                .setWalForceArchiveTimeout(5_000)
+                .setCdcEnabled(true);
+
+            cfg.setConsistentId(igniteInstanceName);
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        cleanPersistenceDir();
+
+        IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> cluster = setupCluster("source", "src-cluster-client", 0);
+
+        srcCluster = cluster.get1();
+        srcClusterCliCfg = cluster.get2();
+
+        discoPort += DFLT_PORT_RANGE + 1;
+        clusterId = DEST_CLUSTER_ID;
+
+        cluster = setupCluster("destination", "dest-cluster-client", 2);
+
+        destCluster = cluster.get1();
+        destClusterCliCfg = cluster.get2();
+
+        String srcTag = srcCluster[0].cluster().tag();
+        String destTag = destCluster[0].cluster().tag();
+
+        assertNotNull(srcTag);
+        assertNotNull(destTag);
+        assertFalse(srcTag.equals(destTag));
+    }
+
+    /** */
+    private IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> setupCluster(String clusterTag, String clientPrefix, int idx) throws Exception {
+        IgniteEx[] cluster = new IgniteEx[] {
+            startGrid(idx + 1),
+            startGrid(idx + 2)
+        };
+
+        IgniteConfiguration[] clusterCliCfg = new IgniteConfiguration[2];
+
+        for (int i = 0; i < 2; i++)
+            clusterCliCfg[i] = optimize(getConfiguration(clientPrefix + i).setClientMode(true));
+
+        cluster[0].cluster().state(ACTIVE);
+        cluster[0].cluster().tag(clusterTag);
+
+        return F.t(cluster, clusterCliCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** Active/Passive mode means changes made only in one cluster. */
+    @Test
+    public void testActivePassiveReplication() throws Exception {
+        List<IgniteInternalFuture<?>> futs = startActivePassiveCdc();
+
+        try {
+            IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].createCache(ACTIVE_PASSIVE_CACHE);
+
+            destCache.put(1, ConflictResolvableTestData.create());
+            destCache.remove(1);
+
+            // Updates for "ignored-cache" should be ignored because of CDC consume configuration.
+            runAsync(generateData(IGNORED_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
+            runAsync(generateData(ACTIVE_PASSIVE_CACHE, srcCluster[srcCluster.length - 1], IntStream.range(0, KEYS_CNT)));
+
+            IgniteCache<Integer, ConflictResolvableTestData> srcCache =
+                srcCluster[srcCluster.length - 1].getOrCreateCache(ACTIVE_PASSIVE_CACHE);
+
+            waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
+
+            IntStream.range(0, KEYS_CNT).forEach(srcCache::remove);
+
+            waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
+
+            assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE));
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
+    /** Active/Active mode means changes made in both clusters. */
+    @Test
+    public void testActiveActiveReplication() throws Exception {
+        IgniteCache<Integer, ConflictResolvableTestData> srcCache = srcCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+        IgniteCache<Integer, ConflictResolvableTestData> destCache = destCluster[0].getOrCreateCache(ACTIVE_ACTIVE_CACHE);
+
+        // Even keys goes to src cluster.
+        runAsync(generateData(ACTIVE_ACTIVE_CACHE, srcCluster[srcCluster.length - 1],
+            IntStream.range(0, KEYS_CNT).filter(i -> i % 2 == 0)));
+
+        // Odd keys goes to dest cluster.
+        runAsync(generateData(ACTIVE_ACTIVE_CACHE, destCluster[destCluster.length - 1],
+            IntStream.range(0, KEYS_CNT).filter(i -> i % 2 != 0)));
+
+        List<IgniteInternalFuture<?>> futs = startActiveActiveCdc();
+
+        try {
+            waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
+
+            runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 == 0).forEach(srcCache::remove));
+            runAsync(() -> IntStream.range(0, KEYS_CNT).filter(j -> j % 2 != 0).forEach(destCache::remove));
+
+            waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
+        }
+        finally {
+            for (IgniteInternalFuture<?> fut : futs)
+                fut.cancel();
+        }
+    }
+
+    /** */
+    public static Runnable generateData(String cacheName, IgniteEx ign, IntStream keys) {
+        return () -> {
+            IgniteCache<Integer, ConflictResolvableTestData> cache = ign.getOrCreateCache(cacheName);
+
+            keys.forEach(i -> cache.put(i, ConflictResolvableTestData.create()));
+        };
+    }
+
+    /** */
+    public void waitForSameData(
+        IgniteCache<Integer, ConflictResolvableTestData> src,
+        IgniteCache<Integer, ConflictResolvableTestData> dest,
+        int keysCnt,
+        WaitDataMode mode,
+        List<IgniteInternalFuture<?>> futs
+    ) throws IgniteInterruptedCheckedException {
+        assertTrue(waitForCondition(() -> {
+            for (int i = 0; i < keysCnt; i++) {
+                if (mode == WaitDataMode.EXISTS) {
+                    if (!src.containsKey(i) || !dest.containsKey(i))
+                        return checkFuts(false, futs);
+                }
+                else if (mode == WaitDataMode.REMOVED) {
+                    if (src.containsKey(i) || dest.containsKey(i))
+                        return checkFuts(false, futs);
+
+                    continue;
+                }
+                else
+                    throw new IllegalArgumentException(mode + " not supported.");
+
+                ConflictResolvableTestData data = dest.get(i);
+
+                if (!data.equals(src.get(i)))
+                    return checkFuts(false, futs);
+            }
+
+            return checkFuts(true, futs);
+        }, getTestTimeout()));
+    }
+
+    /** */
+    private boolean checkFuts(boolean res, List<IgniteInternalFuture<?>> futs) {
+        for (int i = 0; i < futs.size(); i++)
+            assertFalse("Fut " + i, futs.get(i).isDone());
+
+        return res;
+    }
+
+    /** */
+    protected abstract List<IgniteInternalFuture<?>> startActivePassiveCdc();
+
+    /** */
+    protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
new file mode 100644
index 0000000..e6f199b
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsTest.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryVersion;
+import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverPluginProvider;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.cache.CacheObject;
+import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+
+/**
+ * Cache conflict operations test.
+ */
+@RunWith(Parameterized.class)
+public class CacheConflictOperationsTest extends GridCommonAbstractTest {
+    /** Cache mode. */
+    @Parameterized.Parameter
+    public CacheAtomicityMode cacheMode;
+
+    /** Other cluster id. */
+    @Parameterized.Parameter(1)
+    public byte otherClusterId;
+
+    /** @return Test parameters. */
+    @Parameterized.Parameters(name = "cacheMode={0}, otherClusterId={1}")
+    public static Collection<?> parameters() {
+        List<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode mode : EnumSet.of(ATOMIC, TRANSACTIONAL))
+            for (byte otherClusterId : new byte[] {FIRST_CLUSTER_ID, THIRD_CLUSTER_ID})
+                params.add(new Object[] {mode, otherClusterId});
+
+        return params;
+    }
+
+    /** */
+    private static IgniteCache<String, ConflictResolvableTestData> cache;
+
+    /** */
+    private static IgniteInternalCache<BinaryObject, BinaryObject> cachex;
+
+    /** */
+    private static IgniteEx client;
+
+    /** */
+    private static final byte FIRST_CLUSTER_ID = 1;
+
+    /** */
+    private static final byte SECOND_CLUSTER_ID = 2;
+
+    /** */
+    private static final byte THIRD_CLUSTER_ID = 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
+        CacheVersionConflictResolverPluginProvider<?> pluginCfg = new CacheVersionConflictResolverPluginProvider<>();
+
+        pluginCfg.setClusterId(SECOND_CLUSTER_ID);
+        pluginCfg.setCaches(new HashSet<>(Collections.singleton(DEFAULT_CACHE_NAME)));
+        pluginCfg.setConflictResolveField(conflictResolveField());
+
+        return super.getConfiguration(igniteInstanceName).setPluginProviders(pluginCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(1);
+
+        client = startClientGrid(2);
+
+        cache = client.createCache(new CacheConfiguration<String, ConflictResolvableTestData>(DEFAULT_CACHE_NAME).setAtomicityMode(cacheMode));
+        cachex = client.cachex(DEFAULT_CACHE_NAME);
+    }
+
+    /** Tests that regular cache operations works with the conflict resolver when there is no update conflicts. */
+    @Test
+    public void testSimpleUpdates() {
+        String key = "UpdatesWithoutConflict";
+
+        put(key);
+        put(key);
+
+        remove(key);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
+     * when there is no update conflicts.
+     */
+    @Test
+    public void testUpdatesFromOtherClusterWithoutConflict() throws Exception {
+        String key = key("UpdateFromOtherClusterWithoutConflict", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        putConflict(key, 2, true);
+
+        removeConflict(key, 3, true);
+    }
+
+    /**
+     * Tests that {@code IgniteInternalCache#*AllConflict} cache operations works with the conflict resolver
+     * when there are update conflicts.
+     */
+    @Test
+    public void testUpdatesReorderFromOtherCluster() throws Exception {
+        String key = key("UpdateClusterUpdateReorder", otherClusterId);
+
+        putConflict(key, 2, true);
+
+        // Update with the equal or lower order should ignored.
+        putConflict(key, 2, false);
+        putConflict(key, 1, false);
+
+        // Remove with the equal or lower order should ignored.
+        removeConflict(key, 2, false);
+        removeConflict(key, 1, false);
+
+        // Remove with the higher order should succeed.
+        putConflict(key, 3, true);
+
+        key = key("UpdateClusterUpdateReorder2", otherClusterId);
+
+        int order = 1;
+
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), true);
+
+        // Update with the equal or lower topVer should ignored.
+        putConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
+        putConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
+
+        // Remove with the equal or lower topVer should ignored.
+        removeConflict(key, new GridCacheVersion(2, order, 1, otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(1, order, 1, otherClusterId), false);
+
+        // Remove with the higher topVer should succeed.
+        putConflict(key, new GridCacheVersion(3, order, 1, otherClusterId), true);
+
+        key = key("UpdateClusterUpdateReorder3", otherClusterId);
+
+        int topVer = 1;
+
+        putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), true);
+
+        // Update with the equal or lower nodeOrder should ignored.
+        putConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
+        putConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
+
+        // Remove with the equal or lower nodeOrder should ignored.
+        removeConflict(key, new GridCacheVersion(topVer, order, 2, otherClusterId), false);
+        removeConflict(key, new GridCacheVersion(topVer, order, 1, otherClusterId), false);
+
+        // Remove with the higher nodeOrder should succeed.
+        putConflict(key, new GridCacheVersion(topVer, order, 3, otherClusterId), true);
+    }
+
+    /** Tests cache operations for entry replicated from another cluster. */
+    @Test
+    public void testUpdatesConflict() throws Exception {
+        String key = key("UpdateThisClusterConflict0", otherClusterId);
+
+        putConflict(key, 1, true);
+
+        // Local remove for other cluster entry should succeed.
+        remove(key);
+
+        // Conflict replicated update should ignored.
+        // Resolve by field value not applicable because after remove operation "old" value doesn't exists.
+        putConflict(key, 2, false);
+
+        key = key("UpdateThisDCConflict1", otherClusterId);
+
+        putConflict(key, 3, true);
+
+        // Local update for other cluster entry should succeed.
+        put(key);
+
+        key = key("UpdateThisDCConflict2", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated remove should ignored.
+        removeConflict(key, 4, false);
+
+        key = key("UpdateThisDCConflict3", otherClusterId);
+
+        put(key);
+
+        // Conflict replicated update succeed only if resolved by field.
+        putConflict(key, 5, conflictResolveField() != null);
+    }
+
+    /** */
+    private void put(String key) {
+        ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(key);
+
+        cache.put(key, newVal);
+
+        CacheEntry<String, ConflictResolvableTestData> newEntry = cache.getEntry(key);
+
+        assertNull(((CacheEntryVersion)newEntry.version()).otherClusterVersion());
+        assertEquals(newVal, cache.get(key));
+
+        if (oldEntry != null)
+            assertTrue(((CacheEntryVersion)oldEntry.version()).order() < ((CacheEntryVersion)newEntry.version()).order());
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, long order, boolean success) throws IgniteCheckedException {
+        putConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
+    }
+
+    /** Puts entry via {@link IgniteInternalCache#putAllConflict(Map)}. */
+    private void putConflict(String k, GridCacheVersion newVer, boolean success) throws IgniteCheckedException {
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
+        ConflictResolvableTestData newVal = ConflictResolvableTestData.create();
+
+        KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
+        CacheObject val = new CacheObjectImpl(client.binary().toBinary(newVal), null);
+
+        cachex.putAllConflict(singletonMap(key, new GridCacheDrInfo(val, newVer)));
+
+        if (success) {
+            assertEquals(newVer, ((CacheEntryVersion)cache.getEntry(k).version()).otherClusterVersion());
+            assertEquals(newVal, cache.get(k));
+        } else if (oldEntry != null) {
+            assertEquals(oldEntry.getValue(), cache.get(k));
+            assertEquals(oldEntry.version(), cache.getEntry(k).version());
+        }
+    }
+
+    /** */
+    private void remove(String key) {
+        assertTrue(cache.containsKey(key));
+
+        cache.remove(key);
+
+        assertFalse(cache.containsKey(key));
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
+    private void removeConflict(String k, long order, boolean success) throws IgniteCheckedException {
+        removeConflict(k, new GridCacheVersion(1, order, 1, otherClusterId), success);
+    }
+
+    /** Removes entry via {@link IgniteInternalCache#removeAllConflict(Map)}. */
+    private void removeConflict(String k, GridCacheVersion ver, boolean success) throws IgniteCheckedException {
+        assertTrue(cache.containsKey(k));
+
+        CacheEntry<String, ConflictResolvableTestData> oldEntry = cache.getEntry(k);
+
+        KeyCacheObject key = new KeyCacheObjectImpl(k, null, cachex.context().affinity().partition(k));
+
+        cachex.removeAllConflict(singletonMap(key, ver));
+
+        if (success)
+            assertFalse(cache.containsKey(k));
+        else if (oldEntry != null) {
+            assertEquals(oldEntry.getValue(), cache.get(k));
+            assertEquals(oldEntry.version(), cache.getEntry(k).version());
+        }
+    }
+
+    /** */
+    private String key(String key, byte otherClusterId) {
+        return key + otherClusterId + cacheMode;
+    }
+
+    /** */
+    protected String conflictResolveField() {
+        return null;
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java
new file mode 100644
index 0000000..2bf4d81
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CacheConflictOperationsWithFieldTest.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+/** Cache conflict operations test. */
+public class CacheConflictOperationsWithFieldTest extends CacheConflictOperationsTest {
+    /** {@inheritDoc} */
+    @Override protected String conflictResolveField() {
+        return "reqId";
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
new file mode 100644
index 0000000..34082b5
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < srcCluster.length; i++)
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_PASSIVE_CACHE));
+
+        return futs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (int i = 0; i < srcCluster.length; i++)
+            futs.add(igniteToIgnite(srcCluster[i].configuration(), destClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+
+        for (int i = 0; i < destCluster.length; i++)
+            futs.add(igniteToIgnite(destCluster[i].configuration(), srcClusterCliCfg[i], ACTIVE_ACTIVE_CACHE));
+
+        return futs;
+    }
+
+    /**
+     * @param srcCfg Ignite source node configuration.
+     * @param destCfg Ignite destination cluster configuration.
+     * @param cache Cache name to stream to kafka.
+     * @return Future for Change Data Capture application.
+     */
+    protected IgniteInternalFuture<?> igniteToIgnite(IgniteConfiguration srcCfg, IgniteConfiguration destCfg, String cache) {
+        return runAsync(() -> {
+            CdcConfiguration cdcCfg = new CdcConfiguration();
+
+            cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+
+            new CdcMain(srcCfg, null, cdcCfg).run();
+        });
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java
new file mode 100644
index 0000000..d2006b3
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/ConflictResolvableTestData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import java.util.Arrays;
+import java.util.Objects;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicLong;
+
+/** */
+public class ConflictResolvableTestData {
+    /** */
+    public static final AtomicLong REQUEST_ID = new AtomicLong();
+
+    /** */
+    private final byte[] payload;
+
+    /** */
+    private final long reqId;
+
+    /** */
+    public ConflictResolvableTestData(byte[] payload, long reqId) {
+        this.payload = payload;
+        this.reqId = reqId;
+    }
+
+    /**
+     * @return Generated data object.
+     */
+    public static ConflictResolvableTestData create() {
+        byte[] payload = new byte[1024];
+
+        ThreadLocalRandom.current().nextBytes(payload);
+
+        return new ConflictResolvableTestData(payload, REQUEST_ID.incrementAndGet());
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+        ConflictResolvableTestData data = (ConflictResolvableTestData)o;
+        return reqId == data.reqId && Arrays.equals(payload, data.payload);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        int result = Objects.hash(reqId);
+        result = 31 * result + Arrays.hashCode(payload);
+        return result;
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
new file mode 100644
index 0000000..d02fb1e
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/IgniteCdcTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc;
+
+import org.apache.ignite.cdc.kafka.CdcKafkaReplicationAppsTest;
+import org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest;
+import org.apache.ignite.cdc.kafka.KafkaToIgniteLoaderTest;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+/**
+ * Cdc test suite.
+ */
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    CacheConflictOperationsTest.class,
+    CacheConflictOperationsWithFieldTest.class,
+    CdcIgniteToIgniteReplicationTest.class,
+    KafkaToIgniteLoaderTest.class,
+    CdcKafkaReplicationTest.class,
+    CdcKafkaReplicationAppsTest.class
+})
+public class IgniteCdcTestSuite {
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
new file mode 100644
index 0000000..459c229
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.startup.cmdline.CdcCommandLineStartup;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+
+/** */
+public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
+    /** */
+    public static final String INSTANCE_NAME = "INSTANCE_NAME";
+
+    /** */
+    public static final String DISCO_PORT = "DISCO_PORT";
+
+    /** */
+    public static final String DISCO_PORT_RANGE = "DISCO_PORT_RANGE";
+
+    /** */
+    public static final String REPLICATED_CACHE = "REPLICATED_CACHE";
+
+    /** */
+    public static final String TOPIC = "TOPIC";
+
+    /** */
+    public static final String CONSISTENT_ID = "CONSISTENT_ID";
+
+    /** */
+    public static final String PARTS = "PARTS";
+
+    /** */
+    public static final String PARTS_FROM = "PARTS_FROM";
+
+    /** */
+    public static final String PARTS_TO = "PARTS_TO";
+
+    /** */
+    public static final String THREAD_CNT = "THREAD_CNT";
+
+    /** */
+    public static final String MAX_BATCH_SIZE = "MAX_BATCH_SIZE";
+
+    /** */
+    public static final String PROPS_PATH = "PROPS_PATH";
+
+    /** */
+    private String kafkaPropsPath = null;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (kafkaPropsPath == null) {
+            File file = File.createTempFile("kafka", "properties");
+
+            file.deleteOnExit();
+
+            try (FileOutputStream fos = new FileOutputStream(file)) {
+                kafkaProperties().store(fos, null);
+            }
+
+            kafkaPropsPath = "file://" + file.getAbsolutePath();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
+        Map<String, String> params = new HashMap<>();
+
+        params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+        params.put(REPLICATED_CACHE, cache);
+        params.put(TOPIC, topic);
+        params.put(CONSISTENT_ID, String.valueOf(igniteCfg.getConsistentId()));
+        params.put(PARTS, Integer.toString(DFLT_PARTS));
+        params.put(MAX_BATCH_SIZE, Integer.toString(KEYS_CNT));
+        params.put(PROPS_PATH, kafkaPropsPath);
+
+        return runAsync(
+            () -> CdcCommandLineStartup.main(new String[] {prepareConfig("/replication/ignite-to-kafka.xml", params)})
+        );
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteInternalFuture<?> kafkaToIgnite(
+        String cacheName,
+        String topic,
+        IgniteConfiguration igniteCfg,
+        int partFrom,
+        int partTo
+    ) {
+        Map<String, String> params = new HashMap<>();
+
+        int discoPort = getFieldValue(igniteCfg.getDiscoverySpi(), "locPort");
+
+        params.put(INSTANCE_NAME, igniteCfg.getIgniteInstanceName());
+        params.put(DISCO_PORT, Integer.toString(discoPort));
+        params.put(DISCO_PORT_RANGE, Integer.toString(discoPort + DFLT_PORT_RANGE));
+        params.put(REPLICATED_CACHE, cacheName);
+        params.put(TOPIC, topic);
+        params.put(PROPS_PATH, kafkaPropsPath);
+        params.put(PARTS_FROM, Integer.toString(partFrom));
+        params.put(PARTS_TO, Integer.toString(partTo));
+        params.put(THREAD_CNT, Integer.toString((partTo - partFrom) / 3));
+
+        return runAsync(
+            () -> KafkaToIgniteCommandLineStartup.main(new String[] {prepareConfig("/replication/kafka-to-ignite.xml", params)})
+        );
+    }
+
+    /** */
+    private String prepareConfig(String path, Map<String, String> params) {
+        try {
+            String cfg = new String(Files.readAllBytes(Paths.get(CdcKafkaReplicationAppsTest.class.getResource(path).toURI())));
+
+            for (String key : params.keySet()) {
+                String subst = '{' + key + '}';
+
+                while (cfg.contains(subst))
+                    cfg = cfg.replace(subst, params.get(key));
+            }
+
+            File file = File.createTempFile("ignite-config", "xml");
+
+            file.deleteOnExit();
+
+            try (PrintWriter out = new PrintWriter(file)) {
+                out.print(cfg);
+            }
+
+            return file.getAbsolutePath();
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
new file mode 100644
index 0000000..74e7719
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.ignite.cdc.AbstractReplicationTest;
+import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_TOPIC;
+import static org.apache.ignite.testframework.GridTestUtils.runAsync;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests for kafka replication.
+ */
+public class CdcKafkaReplicationTest extends AbstractReplicationTest {
+    /** */
+    public static final String SRC_DEST_TOPIC = "source-dest";
+
+    /** */
+    public static final String DEST_SRC_TOPIC = "dest-source";
+
+    /** */
+    private static EmbeddedKafkaCluster KAFKA = null;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        if (KAFKA == null) {
+            KAFKA = new EmbeddedKafkaCluster(1);
+
+            KAFKA.start();
+        }
+
+        KAFKA.createTopic(DFLT_TOPIC, DFLT_PARTS, 1);
+        KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
+        KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        KAFKA.deleteTopic(DFLT_TOPIC);
+        KAFKA.deleteTopic(SRC_DEST_TOPIC);
+        KAFKA.deleteTopic(DEST_SRC_TOPIC);
+
+        waitForCondition(() -> {
+            Set<String> topics = KAFKA.getAllTopicsInCluster();
+
+            return !topics.contains(DFLT_TOPIC) && !topics.contains(SRC_DEST_TOPIC) && !topics.contains(DEST_SRC_TOPIC);
+        }, getTestTimeout());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActivePassiveCdc() {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (IgniteEx ex : srcCluster)
+            futs.add(igniteToKafka(ex.configuration(), DFLT_TOPIC, ACTIVE_PASSIVE_CACHE));
+
+        for (int i = 0; i < destCluster.length; i++) {
+            futs.add(kafkaToIgnite(
+                ACTIVE_PASSIVE_CACHE,
+                DFLT_TOPIC,
+                destClusterCliCfg[i],
+                i * (DFLT_PARTS / 2),
+                (i + 1) * (DFLT_PARTS / 2)
+            ));
+        }
+
+
+        return futs;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected List<IgniteInternalFuture<?>> startActiveActiveCdc() {
+        List<IgniteInternalFuture<?>> futs = new ArrayList<>();
+
+        for (IgniteEx ex : srcCluster)
+            futs.add(igniteToKafka(ex.configuration(), SRC_DEST_TOPIC, ACTIVE_ACTIVE_CACHE));
+
+        for (IgniteEx ex : destCluster)
+            futs.add(igniteToKafka(ex.configuration(), DEST_SRC_TOPIC, ACTIVE_ACTIVE_CACHE));
+
+        futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, SRC_DEST_TOPIC, destClusterCliCfg[0], 0, DFLT_PARTS));
+        futs.add(kafkaToIgnite(ACTIVE_ACTIVE_CACHE, DEST_SRC_TOPIC, srcClusterCliCfg[0], 0, DFLT_PARTS));
+
+        return futs;
+    }
+
+    /**
+     * @param igniteCfg Ignite configuration.
+     * @param topic Kafka topic name.
+     * @param cache Cache name to stream to kafka.
+     * @return Future for Change Data Capture application.
+     */
+    protected IgniteInternalFuture<?> igniteToKafka(IgniteConfiguration igniteCfg, String topic, String cache) {
+        return runAsync(() -> {
+            IgniteToKafkaCdcStreamer cdcCnsmr =
+                new IgniteToKafkaCdcStreamer(topic, DFLT_PARTS, Collections.singleton(cache), KEYS_CNT, false, kafkaProperties());
+
+            CdcConfiguration cdcCfg = new CdcConfiguration();
+
+            cdcCfg.setConsumer(cdcCnsmr);
+
+            new CdcMain(igniteCfg, null, cdcCfg).run();
+        });
+    }
+
+    /**
+     * @param cacheName Cache name.
+     * @param igniteCfg Ignite configuration.
+     * @return Future for runed {@link KafkaToIgniteCdcStreamer}.
+     */
+    protected IgniteInternalFuture<?> kafkaToIgnite(
+        String cacheName,
+        String topic,
+        IgniteConfiguration igniteCfg,
+        int fromPart,
+        int toPart
+    ) {
+        KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();
+
+        cfg.setKafkaPartsFrom(fromPart);
+        cfg.setKafkaPartsTo(toPart);
+        cfg.setThreadCount((toPart - fromPart)/2);
+
+        cfg.setCaches(Collections.singletonList(cacheName));
+        cfg.setTopic(topic);
+
+        return runAsync(new KafkaToIgniteCdcStreamer(igniteCfg, kafkaProperties(), cfg));
+    }
+
+    /** */
+    protected Properties kafkaProperties() {
+        Properties props = new Properties();
+
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000");
+
+        return props;
+    }
+}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
new file mode 100644
index 0000000..413d105
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteLoaderTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteLoader.loadKafkaToIgniteStreamer;
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/** Tests load {@link KafkaToIgniteCdcStreamer} from Srping xml file. */
+public class KafkaToIgniteLoaderTest extends GridCommonAbstractTest {
+    /** */
+    @Test
+    public void testLoadConfig() throws Exception {
+        assertThrows(
+            null,
+            () -> loadKafkaToIgniteStreamer("loader/kafka-to-ignite-double-ignite-cfg.xml"),
+            IgniteCheckedException.class,
+            "Exact 1 IgniteConfiguration should be defined. Found 2"
+        );
+
+        assertThrows(
+            null,
+            () -> loadKafkaToIgniteStreamer("loader/kafka-to-ignite-without-kafka-properties.xml"),
+            IgniteCheckedException.class,
+            "Spring bean with provided name doesn't exist"
+        );
+
+        KafkaToIgniteCdcStreamer streamer = loadKafkaToIgniteStreamer("loader/kafka-to-ignite-correct.xml");
+
+        assertNotNull(streamer);
+    }
+}
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
new file mode 100644
index 0000000..d8a2efa
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-correct.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+                <property name="cdcEnabled" value="true" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+
+    <util:properties id="kafkaProperties" location="loader/kafka.properties" />
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
new file mode 100644
index 0000000..d7e7cb9
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-double-ignite-cfg.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+    <import resource="kafka-to-ignite-correct.xml" />
+
+    <bean id="grid.cfg2" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+                <property name="cdcEnabled" value="true" />
+            </bean>
+        </property>
+    </bean>
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
new file mode 100644
index 0000000..3182771
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka-to-ignite-without-kafka-properties.xml
@@ -0,0 +1,38 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="dataStorageConfiguration">
+            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+                <property name="cdcEnabled" value="true" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration" />
+</beans>
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/loader/kafka.properties b/modules/cdc-ext/src/test/resources/loader/kafka.properties
new file mode 100644
index 0000000..58a46b9
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/loader/kafka.properties
@@ -0,0 +1,4 @@
+bootstrap.servers=127.0.0.1
+key.serializer=ru.SomeClass
+value.serializer=ru.SomeOtherClass
+group.id=my-group
\ No newline at end of file
diff --git a/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
new file mode 100644
index 0000000..964daf0
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/ignite-to-kafka.xml
@@ -0,0 +1,64 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+        <property name="peerClassLoadingEnabled" value="true" />
+        <property name="localHost" value="127.0.0.1" />
+        <property name="consistentId" value="{CONSISTENT_ID}" />
+
+        <property name="dataStorageConfiguration">
+            <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
+                <property name="defaultDataRegionConfiguration">
+                    <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
+                        <property name="persistenceEnabled" value="true" />
+                    </bean>
+                </property>
+                <property name="cdcEnabled" value="true" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="cdc.cfg" class="org.apache.ignite.cdc.CdcConfiguration">
+        <property name="consumer">
+            <bean class="org.apache.ignite.cdc.kafka.IgniteToKafkaCdcStreamer">
+                <constructor-arg name="topic" value="{TOPIC}" />
+                <constructor-arg name="kafkaParts" value="{PARTS}" />
+                <constructor-arg name="caches">
+                    <util:list>
+                        <bean class="java.lang.String">
+                            <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+                        </bean>
+                    </util:list>
+                </constructor-arg>
+                <constructor-arg name="maxBatchSize" value="{MAX_BATCH_SIZE}" />
+                <constructor-arg name="onlyPrimary" value="false" />
+                <constructor-arg name="kafkaProps" ref="kafkaProperties" />
+            </bean>
+        </property>
+    </bean>
+
+    <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>
diff --git a/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
new file mode 100644
index 0000000..154b1b3
--- /dev/null
+++ b/modules/cdc-ext/src/test/resources/replication/kafka-to-ignite.xml
@@ -0,0 +1,61 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:util="http://www.springframework.org/schema/util"
+       xsi:schemaLocation="
+        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
+        <property name="igniteInstanceName" value="{INSTANCE_NAME}" />
+        <property name="clientMode" value="true" />
+        <property name="peerClassLoadingEnabled" value="true" />
+        <property name="localHost" value="127.0.0.1" />
+
+        <property name="discoverySpi">
+            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
+                <property name="localPort" value="{DISCO_PORT}" />
+                <property name="ipFinder">
+                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
+                        <property name="addresses"
+                                  value="127.0.0.1:{DISCO_PORT}..{DISCO_PORT_RANGE}" />
+                    </bean>
+                </property>
+                <property name="joinTimeout" value="10000" />
+            </bean>
+        </property>
+    </bean>
+
+    <bean id="streamer.cfg" class="org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration">
+        <property name="caches">
+            <util:list>
+                <bean class="java.lang.String">
+                    <constructor-arg type="String" value="{REPLICATED_CACHE}" />
+                </bean>
+            </util:list>
+        </property>
+        <property name="topic" value="{TOPIC}" />
+        <property name="kafkaPartsFrom" value="{PARTS_FROM}"/>
+        <property name="kafkaPartsTo" value="{PARTS_TO}"/>
+        <property name="threadCount" value="{THREAD_CNT}"/>
+    </bean>
+
+    <util:properties id="kafkaProperties" location="{PROPS_PATH}" />
+</beans>
diff --git a/modules/performance-statistics-ext/pom.xml b/modules/performance-statistics-ext/pom.xml
index 01d7135..86c19c4 100644
--- a/modules/performance-statistics-ext/pom.xml
+++ b/modules/performance-statistics-ext/pom.xml
@@ -106,7 +106,6 @@
         <plugins>
             <plugin>
                 <artifactId>maven-assembly-plugin</artifactId>
-                <version>2.4</version>
                 <executions>
                     <execution>
                         <!-- Create zip archive with UI resources. -->
diff --git a/parent/pom.xml b/parent/pom.xml
index 911558a..312d3a6 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -106,6 +106,7 @@
         <lucene.bundle.version>7.4.0_1</lucene.bundle.version>
         <lucene.version>7.4.0</lucene.version>
         <lz4.version>1.5.0</lz4.version>
+        <maven.assembly.plugin.version>2.4</maven.assembly.plugin.version>
         <maven.bundle.plugin.version>3.5.0</maven.bundle.plugin.version>
         <maven.checkstyle.plugin.version>3.0.0</maven.checkstyle.plugin.version>
         <checkstyle.puppycrawl.version>8.19</checkstyle.puppycrawl.version>
@@ -473,6 +474,10 @@
                         </execution>
                     </executions>
                 </plugin>
+                <plugin>
+                    <artifactId>maven-assembly-plugin</artifactId>
+                    <version>${maven.assembly.plugin.version}</version>
+                </plugin>
             </plugins>
         </pluginManagement>
 
diff --git a/pom.xml b/pom.xml
index 27249de..92019f1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -63,6 +63,7 @@
         <module>modules/spring-tx-ext</module>
         <module>modules/spring-cache-ext</module>
         <module>modules/spring-session-ext</module>
+        <module>modules/cdc-ext</module>
     </modules>
 
     <profiles>