You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2023/01/24 12:30:20 UTC

[ignite] branch master updated: IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)

This is an automated email from the ASF dual-hosted git repository.

ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new e0e2edd5b02 IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)
e0e2edd5b02 is described below

commit e0e2edd5b029701ffbc18d01709da86bcd45405f
Author: Sergey Korotkov <se...@gmail.com>
AuthorDate: Tue Jan 24 19:30:10 2023 +0700

    IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)
---
 .../ducktest/tests/cdc/CountingCdcConsumer.java    |  91 ++++++++++++++++
 .../ignitetest/services/utils/cdc/__init__.py      |  18 +++
 .../ignitetest/services/utils/cdc/ignite_cdc.py    |  86 +++++++++++++++
 .../ignitetest/services/utils/ducktests_service.py |   2 +-
 .../ignitetest/services/utils/ignite_aware.py      |  20 +---
 .../utils/ignite_configuration/__init__.py         |   9 +-
 .../utils/ignite_configuration/data_storage.py     |   3 +
 .../tests/ignitetest/services/utils/ignite_spec.py |   2 +
 .../tests/ignitetest/services/utils/jmx_utils.py   |   2 +-
 .../tests/ignitetest/services/utils/jvm_utils.py   |  17 +++
 .../ignitetest/services/utils/templates/bean.j2    |  22 ++++
 ...config.xml.j2 => client_configuration_macro.j2} |   7 +-
 .../services/utils/templates/ignite.xml.j2         | 121 +--------------------
 ...ignite.xml.j2 => ignite_configuration_macro.j2} |  16 +--
 .../services/utils/templates/misc_macro.j2         |  14 ++-
 .../utils/templates/thin_client_config.xml.j2      |  28 ++---
 .../tests/ignitetest/tests/cdc/__init__.py         |  18 +++
 .../tests/ignitetest/tests/cdc/cdc_test.py         | 101 +++++++++++++++++
 .../tests/control_utility/consistency_test.py      |   2 +-
 .../tests/ignitetest/tests/snapshot_test.py        |   2 +-
 modules/ducktests/tests/ignitetest/utils/bean.py   |  10 ++
 21 files changed, 408 insertions(+), 183 deletions(-)

diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java
new file mode 100644
index 00000000000..d6b173ae07a
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cdc;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Simple CDC consumer.
+ *
+ * Just counts the total number of the objects consumed.
+ */
+public class CountingCdcConsumer implements CdcConsumer {
+    /** Logger. */
+    @LoggerResource
+    protected IgniteLogger log;
+
+    /** Total count of the object consumed. */
+    private final AtomicLong objectsConsumed = new AtomicLong();
+
+    /** {@inheritDoc} */
+    @Override public void start(MetricRegistry mreg) {
+        log.info("CountingCdcConsumer started");
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean onEvents(Iterator<CdcEvent> events) {
+        events.forEachRemaining(this::consume);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onTypes(Iterator<BinaryType> types) {
+        types.forEachRemaining(this::consume);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onMappings(Iterator<TypeMapping> mappings) {
+        mappings.forEachRemaining(this::consume);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+        cacheEvents.forEachRemaining(this::consume);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCacheDestroy(Iterator<Integer> caches) {
+        caches.forEachRemaining(this::consume);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop() {
+        log.info("CountingCdcConsumer stopped, objectsConsumed: " + objectsConsumed.get());
+    }
+
+    /**
+     * Consume object by counting it.
+     *
+     * @param object object passed from the CDC engine.
+     */
+    private void consume(Object object) {
+        objectsConsumed.incrementAndGet();
+
+        log.info("Consumed: [object=" + object + "]");
+    }
+}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py
new file mode 100644
index 00000000000..2e0217ffec0
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains utility classes for CDC.
+"""
diff --git a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
new file mode 100644
index 00000000000..034c464a566
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+
+"""
+This module contains Ignite CDC utility (ignite-cdc.sh) wrapper.
+"""
+import signal
+
+from ignitetest.services.utils.ignite_spec import envs_to_exports
+from ignitetest.services.utils.jvm_utils import JvmProcessMixin
+
+
+class IgniteCdcUtility(JvmProcessMixin):
+    """
+    Ignite CDC utility (ignite-cdc.sh) wrapper.
+    """
+    BASE_COMMAND = "ignite-cdc.sh"
+    JAVA_CLASS = "org.apache.ignite.startup.cmdline.CdcCommandLineStartup"
+
+    def __init__(self, cluster):
+        self.cluster = cluster
+        self.logger = cluster.context.logger
+
+    def start(self):
+        """
+        Start ignite-cdc.sh on cluster nodes.
+        """
+        def __start(node):
+            self.logger.info(f"{self.__service_node_id(node)}: starting {self.BASE_COMMAND}")
+
+            raw_output = node.account.ssh_capture(
+                self.__form_cmd(f"{self.BASE_COMMAND} -v {self.cluster.config_file}"))
+
+            code, _ = self.__parse_output(raw_output)
+
+            self.logger.debug(f"{self.__service_node_id(node)}: {self.BASE_COMMAND} finished with exit code: {code}")
+
+        self.logger.info(f"{self.cluster.service_id}: starting {self.BASE_COMMAND} ...")
+
+        self.cluster.exec_on_nodes_async(self.cluster.nodes, __start, timeout_sec=1)
+
+    def stop(self, force_stop=False):
+        """
+        Stop ignite-cdc.sh on cluster nodes.
+        """
+        def __stop(node):
+            self.logger.info(f"{self.__service_node_id(node)}: stopping {self.BASE_COMMAND}")
+
+            pids = self.pids(node, self.JAVA_CLASS)
+
+            for pid in pids:
+                node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
+
+        self.logger.info(f"{self.cluster.service_id}: stopping {self.BASE_COMMAND} ...")
+
+        self.cluster.exec_on_nodes_async(self.cluster.nodes, __stop)
+
+    def __service_node_id(self, node):
+        return f"{self.cluster.service_id} node {self.cluster.idx(node)} on {node.account.hostname}"
+
+    def __form_cmd(self, cmd):
+        envs = self.cluster.spec.envs()
+
+        envs["CDC_JVM_OPTS"] = f"\"{' '.join(self.cluster.spec.jvm_opts)}\""
+
+        return f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd)
+
+    @staticmethod
+    def __parse_output(raw_output):
+        exit_code = raw_output.channel_file.channel.recv_exit_status()
+        output = "".join(raw_output)
+
+        return exit_code, output
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py b/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
index 5da4b07b039..115df34902d 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
@@ -53,4 +53,4 @@ class DucktestsService(Service, metaclass=ABCMeta):
         self.stop(force_stop=True)
 
     def clean_node(self, node, **kwargs):
-        assert self.stopped
+        pass
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 4fe44b29e00..ef5d05b4582 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -38,12 +38,13 @@ from ignitetest.services.utils.background_thread import BackgroundThreadService
 from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
 from ignitetest.services.utils.ignite_spec import resolve_spec, SHARED_PREPARED_FILE
 from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient
+from ignitetest.services.utils.jvm_utils import JvmProcessMixin
 from ignitetest.services.utils.log_utils import monitor_log
 from ignitetest.services.utils.path import IgnitePathAware
 from ignitetest.utils.enum import constructible
 
 
-class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABCMeta):
+class IgniteAwareService(BackgroundThreadService, IgnitePathAware, JvmProcessMixin, metaclass=ABCMeta):
     """
     The base class to build services aware of Ignite.
     """
@@ -150,7 +151,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
                                (str(node.account), self.shutdown_timeout_sec))
 
     def stop_node(self, node, force_stop=False, **kwargs):
-        pids = self.pids(node)
+        pids = self.pids(node, self.main_java_class)
 
         for pid in pids:
             node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
@@ -221,15 +222,6 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
 
         setattr(node, "consistent_id", node.account.externally_routable_ip)
 
-    def pids(self, node):
-        """
-        :param node: Ignite service node.
-        :return: List of service's pids.
-        """
-        cmd = "pgrep -ax java | awk '/%s/ {print $1}'" % self.main_java_class
-
-        return [int(pid) for pid in node.account.ssh_capture(cmd, allow_fail=True)]
-
     def worker(self, idx, node, **kwargs):
         cmd = self.spec.command(node)
 
@@ -242,7 +234,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
         :param node: Ignite service node.
         :return: True if node is alive.
         """
-        return len(self.pids(node)) > 0
+        return len(self.pids(node, self.main_java_class)) > 0
 
     def await_event_on_node(self, evt_message, node, timeout_sec, from_the_beginning=False, backoff_sec=.1,
                             log_file=None):
@@ -525,7 +517,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
         Generate thread dump on node.
         :param node: Ignite service node.
         """
-        for pid in self.pids(node):
+        for pid in self.pids(node, self.main_java_class):
             try:
                 node.account.signal(pid, signal.SIGQUIT, allow_fail=True)
             except RemoteCommandError:
@@ -550,7 +542,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
         snapshot_db = os.path.join(self.snapshots_dir, snapshot_name, "db")
 
         for node in self.nodes:
-            assert len(self.pids(node)) == 0
+            assert len(self.pids(node, self.main_java_class)) == 0
 
             node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False)
             node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index 0f8128c7840..a9c84f4d295 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -73,7 +73,7 @@ class IgniteConfiguration(NamedTuple):
     auto_activation_enabled: bool = None
     transaction_configuration: TransactionConfiguration = None
 
-    def __prepare_ssl(self, test_globals, shared_root):
+    def prepare_ssl(self, test_globals, shared_root):
         """
         Updates ssl configuration from globals.
         """
@@ -112,7 +112,7 @@ class IgniteConfiguration(NamedTuple):
         """
         Updates configuration based on current environment.
         """
-        return self.__prepare_ssl(cluster.globals, cluster.shared_root).__prepare_discovery(cluster, node)
+        return self.__prepare_discovery(cluster, node)
 
     @property
     def service_type(self):
@@ -138,8 +138,9 @@ class IgniteThinClientConfiguration(NamedTuple):
     ssl_params: SslParams = None
     username: str = None
     password: str = None
+    ext_beans: list = []
 
-    def __prepare_ssl(self, test_globals, shared_root):
+    def prepare_ssl(self, test_globals, shared_root):
         """
         Updates ssl configuration from globals.
         """
@@ -154,7 +155,7 @@ class IgniteThinClientConfiguration(NamedTuple):
         """
         Updates configuration based on current environment.
         """
-        return self.__prepare_ssl(cluster.globals, cluster.shared_root)
+        return self
 
     @property
     def service_type(self):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
index d797035a277..0f0811bdf77 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
@@ -24,6 +24,7 @@ class DataRegionConfiguration(NamedTuple):
     """
     Ignite DataRegion Configuration
     """
+    cdc_enabled: bool = False
     name: str = "default"
     persistence_enabled: bool = False
     initial_size: int = 100 * 1024 * 1024
@@ -36,6 +37,7 @@ class DataStorageConfiguration(NamedTuple):
     """
     Ignite DataStorage configuration
     """
+    cdc_wal_path: str = None
     checkpoint_frequency: int = None
     default: DataRegionConfiguration = DataRegionConfiguration()
     max_wal_archive_size: int = None
@@ -44,6 +46,7 @@ class DataStorageConfiguration(NamedTuple):
     regions: list = []
     wal_buffer_size: int = None
     wal_compaction_enabled: bool = None
+    wal_force_archive_timeout: int = None
     wal_history_size: int = None
     wal_mode: str = None
     wal_segment_size: int = None
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 2921e597422..b82551491dc 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -158,6 +158,8 @@ class IgniteSpec(metaclass=ABCMeta):
                     is_jmx_metrics_enabled(self.service)):
                 config = config._replace(ignite_instance_name=self._test_id)
 
+        config = config.prepare_ssl(self.service.globals, self.service.shared_root)
+
         return config
 
     @property
diff --git a/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
index 61e50bcaadc..462e513ac02 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
@@ -29,7 +29,7 @@ def ignite_jmx_mixin(node, service):
     :param node: Ignite service node.
     :param service: Ignite service.
     """
-    setattr(node, 'pids', service.pids(node))
+    setattr(node, 'pids', service.pids(node, service.main_java_class))
     setattr(node, 'install_root', service.install_root)
     base_cls = node.__class__
     base_cls_name = node.__class__.__name__
diff --git a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
index 51ea81b5aea..0fccd871a75 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
@@ -96,3 +96,20 @@ def _remove_duplicates(params: dict):
                     del params[param_key]
                 else:
                     duplicates[dup_key] = True
+
+
+class JvmProcessMixin:
+    """
+    Mixin to work with JVM processes
+    """
+    @staticmethod
+    def pids(node, java_class):
+        """
+        Return pids of jvm processes running this java class on service node.
+        :param node: Service node.
+        :param java_class: Java class name
+        :return: List of service's pids.
+        """
+        cmd = "pgrep -ax java | awk '/%s/ {print $1}'" % java_class
+
+        return [int(pid) for pid in node.account.ssh_capture(cmd, allow_fail=True)]
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2
new file mode 100644
index 00000000000..d2e573a2621
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, bean_object) %}
+    {{ misc_utils.bean(bean_object) }}
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
similarity index 85%
copy from modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
copy to modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
index 7ec955b5c1b..7e034bfed9f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
@@ -17,10 +17,7 @@
 
 {% import 'ssl_params_macro.j2' as ssl_params_util %}
 
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+{% macro apply(config, service) %}
     <bean class="org.apache.ignite.configuration.ClientConfiguration" id="thin.client.cfg">
         <property name="addresses">
             <list>
@@ -41,4 +38,4 @@
         <property name="sslTrustCertificateKeyStorePassword" value="{{ config.ssl_params.trust_store_password }}"/>
         {% endif %}
     </bean>
-</beans>
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index a63fcc6ff3b..847694b3579 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -17,133 +17,16 @@
  limitations under the License.
 #}
 
-{% import 'communication_macro.j2' as communication %}
-{% import 'discovery_macro.j2' as disco_utils %}
-{% import 'cache_macro.j2' as cache_utils %}
-{% import 'datastorage_macro.j2' as datastorage_utils %}
+{% import 'ignite_configuration_macro.j2' as ignite_configuration %}
 {% import 'misc_macro.j2' as misc_utils %}
-{% import 'ssl_params_macro.j2' as ssl_params_util %}
-{% import 'connector_configuration.j2' as connector_configuration_util %}
-{% import 'client_connector_configuration.j2' as client_connector_configuration_util %}
-{% import 'transaction_macro.j2' as transaction_util %}
 
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
-    <bean class="org.apache.ignite.configuration.IgniteConfiguration">
-        <property name="workDirectory" value="{{ service.work_dir }}" />
-        <property name="gridLogger">
-            <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
-                <constructor-arg type="java.lang.String" value="{{ service.config_dir }}/{{ config.log4j_config }}"/>
-            </bean>
-        </property>
 
-        <property name="clientMode" value="{{ config.client_mode or False | lower }}"/>
-        <property name="consistentId" value="{{ config.consistent_id }}"/>
-        <property name="igniteInstanceName" value="{{ config.ignite_instance_name }}"/>
-        <property name="failureDetectionTimeout" value="{{ config.failure_detection_timeout }}"/>
-        <property name="systemWorkerBlockedTimeout" value="{{ config.sys_worker_blocked_timeout }}"/>
-        <property name="peerClassLoadingEnabled" value="{{ config.peer_class_loading_enabled }}"/>
-        <property name="authenticationEnabled" value="{{ config.auth_enabled | lower }}"/>
-        {% if config.rebalance_thread_pool_size %}
-            <property name="rebalanceThreadPoolSize" value="{{ config.rebalance_thread_pool_size }}"/>
-        {% endif %}
-        {% if config.rebalance_batch_size %}
-            <property name="rebalanceBatchSize" value="{{ config.rebalance_batch_size }}"/>
-        {% endif %}
-        {% if config.rebalance_batches_prefetch_count %}
-            <property name="rebalanceBatchesPrefetchCount" value="{{ config.rebalance_batches_prefetch_count }}"/>
-        {% endif %}
-        {% if config.rebalance_throttle %}
-            <property name="rebalanceThrottle" value="{{ config.rebalance_throttle }}"/>
-        {% endif %}
-
-        {% if config.metrics_log_frequency %}
-            <property name="metricsLogFrequency" value="{{ config.metrics_log_frequency }}"/>
-        {% endif %}
-
-        {% if config.metrics_update_frequency %}
-            <property name="metricsUpdateFrequency" value="{{ config.metrics_update_frequency }}"/>
-        {% endif %}
-
-        {% if config.auto_activation_enabled %}
-            <property name="autoActivationEnabled" value="{{ config.auto_activation_enabled }}"/>
-        {% endif %}
-
-        {% if config.metric_exporters | length > 0 %}
-            <property name="metricExporterSpi">
-                <list>
-                {% for exporter in config.metric_exporters %}
-                    {{ misc_utils.bean(exporter) }}
-                {% endfor %}
-                </list>
-            </property>
-        {% endif %}
-
-        {{ misc_utils.cluster_state(config.cluster_state, config.version) }}
-
-        {{ communication.communication_spi(config.communication_spi) }}
-
-        {{ disco_utils.discovery_spi(config.discovery_spi) }}
-
-        {{ datastorage_utils.data_storage(config.data_storage) }}
-
-        {{ cache_utils.cache_configs(config.caches) }}
-
-        {% if config.local_host %}
-            <property name="localHost" value="{{ config.local_host }}"/>
-        {% endif %}
-
-        {% if config.properties %}
-            {{ config.properties }}
-        {% endif %}
-
-        {% if config.ssl_params %}
-            <property name="sslContextFactory">
-                {{ ssl_params_util.ssl_params(config.ssl_params) }}
-            </property>
-        {% endif %}
-
-        {{ connector_configuration_util.connector_configuration(config.connector_configuration) }}
-
-        {{ client_connector_configuration_util.client_connector_configuration(config.client_connector_configuration) }}
-
-        {{ transaction_util.transaction(config.transaction_configuration) }}
-
-        {% if config.binary_configuration %}
-            <property name="binaryConfiguration">
-                <bean class="org.apache.ignite.configuration.BinaryConfiguration">
-                    <property name="compactFooter" value="{{ config.binary_configuration.compact_footer }}"/>
-                </bean>
-            </property>
-        {% endif %}
-
-        {% if config.local_event_listeners %}
-            <property name="localEventListeners" ref="{{ config.local_event_listeners }}"/>
-        {% endif %}
-
-        {% if config.include_event_types | length > 0 %}
-            <property name="includeEventTypes" ref="eventTypes"/>
-        {% endif %}
-
-        {% if config.event_storage_spi %}
-            <property name="eventStorageSpi" ref="{{ config.event_storage_spi }}"/>
-        {% endif %}
-
-        {% if config.sql_schemas | length > 0 %}
-            <property name="sqlSchemas">
-                <list>
-                {% for schema in config.sql_schemas %}
-                    <value>{{ schema }}</value>
-                {% endfor %}
-                </list>
-            </property>
-        {% endif %}
-
-        {{ misc_utils.plugins(config) }}
-    </bean>
+    {{ ignite_configuration.apply(config, service) }}
 
     {{ misc_utils.ext_beans(config) }}
 
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
similarity index 90%
copy from modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
copy to modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
index a63fcc6ff3b..d5f7b88012c 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
@@ -1,5 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
 {#
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
@@ -27,11 +25,7 @@
 {% import 'client_connector_configuration.j2' as client_connector_configuration_util %}
 {% import 'transaction_macro.j2' as transaction_util %}
 
-<beans xmlns="http://www.springframework.org/schema/beans"
-       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
-        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
-        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+{% macro apply(config, service) %}
     <bean class="org.apache.ignite.configuration.IgniteConfiguration">
         <property name="workDirectory" value="{{ service.work_dir }}" />
         <property name="gridLogger">
@@ -144,10 +138,4 @@
 
         {{ misc_utils.plugins(config) }}
     </bean>
-
-    {{ misc_utils.ext_beans(config) }}
-
-    {% if config.include_event_types | length > 0 %}
-        {{ misc_utils.event_types(config) }}
-    {% endif %}
-</beans>
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
index 1b69ec7534a..4c78f90ca23 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
@@ -60,9 +60,19 @@
         <bean class="{{ bean_object }}">
             {% for name, value in bean_object.properties.items() %}
                 {% if value is string or value is number or value is boolean %}
-                    <property name="{{ name }}" value="{{ value }}"/>
+                    <property name="{{ name  | snake_to_camel }}" value="{{ value }}"/>
+                {% elif value is iterable and value is not mapping %}
+                    <property name="{{ name | snake_to_camel }}">
+                        <list>
+                            {% for v in value %}
+                                <value>{{ v }}</value>
+                            {% endfor %}
+                        </list>
+                    </property>
+                {% elif value.ref | length > 0 %}
+                    <property name="{{ name  | snake_to_camel }}" ref="{{ value.ref }}"/>
                 {% else %}
-                    <property name="{{ name }}">
+                    <property name="{{ name | snake_to_camel }}">
                         {{ bean(value) }}
                     </property>
                 {% endif %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
index 7ec955b5c1b..527d34f9ae0 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
@@ -15,30 +15,16 @@
  limitations under the License.
 #}
 
-{% import 'ssl_params_macro.j2' as ssl_params_util %}
+{% import 'client_configuration_macro.j2' as client_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
 
 <?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
-    <bean class="org.apache.ignite.configuration.ClientConfiguration" id="thin.client.cfg">
-        <property name="addresses">
-            <list>
-                <value>{{ config.addresses }}</value>
-            </list>
-        </property>
+       xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
+       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+       http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+    {{ client_configuration.apply(config, service) }}
 
-        {% if config.username %}
-        <property name="userName" value="{{ config.username }}"/>
-        <property name="userPassword" value="{{ config.password }}"/>
-        {% endif %}
-
-        {% if config.ssl_params %}
-        <property name="sslMode" value="REQUIRED"/>
-        <property name="sslClientCertificateKeyStorePath" value="{{ config.ssl_params.key_store_path }}"/>
-        <property name="sslClientCertificateKeyStorePassword" value="{{ config.ssl_params.key_store_password }}"/>
-        <property name="sslTrustCertificateKeyStorePath" value="{{ config.ssl_params.trust_store_path }}"/>
-        <property name="sslTrustCertificateKeyStorePassword" value="{{ config.ssl_params.trust_store_password }}"/>
-        {% endif %}
-    </bean>
+    {{ misc_utils.ext_beans(config) }}
 </beans>
diff --git a/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py b/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py
new file mode 100644
index 00000000000..34b78cf8898
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This package contains CDC tests.
+"""
diff --git a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py
new file mode 100644
index 00000000000..4a1cc9b7381
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+import time
+
+from ducktape.mark import parametrize
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.ignite_cdc import IgniteCdcUtility
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
+from ignitetest.tests.client_test import check_topology
+from ignitetest.utils import cluster, ignite_versions, ignore_if
+from ignitetest.utils.bean import Bean
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import LATEST, IgniteVersion, DEV_BRANCH, V_2_14_0
+
+
+class CdcTest(IgniteTest):
+    """
+    CDC tests.
+    """
+    CACHE_NAME = "cdc-test-cache"
+    JAVA_CLIENT_CLASS_NAME = "org.apache.ignite.internal.ducktest.tests.client_test.IgniteCachePutClient"
+
+    @cluster(num_nodes=5)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST))
+    @ignore_if(lambda version, _: version <= V_2_14_0)
+    @parametrize(num_nodes=5, wal_force_archive_timeout=100, pacing=10, duration_sec=10)
+    def test_cdc_start_stop(self, ignite_version, num_nodes, wal_force_archive_timeout, pacing, duration_sec):
+        """
+        Test for starting and stopping the ignite-cdc.sh
+        """
+        cdc_configuration = Bean("org.apache.ignite.cdc.CdcConfiguration",
+                                 consumer=Bean("org.apache.ignite.internal.ducktest.tests.cdc.CountingCdcConsumer"))
+
+        config = IgniteConfiguration(
+            version=IgniteVersion(ignite_version),
+            data_storage=DataStorageConfiguration(
+                wal_force_archive_timeout=wal_force_archive_timeout,
+                cdc_wal_path="cdc-custom-wal-dir",
+                default=DataRegionConfiguration(
+                    persistence_enabled=True,
+                    cdc_enabled=True
+                )
+            ),
+            ext_beans=[('bean.j2', cdc_configuration)]
+        )
+
+        ignite = IgniteService(self.test_context, config=config, num_nodes=num_nodes-2)
+        ignite.start()
+
+        control_sh = ControlUtility(cluster=ignite)
+        control_sh.activate()
+
+        ignite_cdc = IgniteCdcUtility(ignite)
+        ignite_cdc.start()
+
+        ignite.await_event("CountingCdcConsumer started", timeout_sec=60, from_the_beginning=True,
+                           log_file="ignite-cdc.log")
+
+        client_cfg = config._replace(client_mode=True)
+        client = IgniteApplicationService(self.test_context, client_cfg,
+                                          java_class_name=self.JAVA_CLIENT_CLASS_NAME,
+                                          num_nodes=2,
+                                          params={"cacheName": self.CACHE_NAME, "pacing": pacing})
+
+        client.start()
+
+        check_topology(control_sh, num_nodes)
+
+        ignite.await_event("Consumed", timeout_sec=duration_sec, from_the_beginning=True,
+                           log_file="ignite-cdc.log")
+
+        time.sleep(duration_sec)
+
+        client.stop()
+
+        ignite_cdc.stop()
+
+        ignite.await_event("Stopping Change Data Capture service instance",
+                           timeout_sec=120, from_the_beginning=True,
+                           log_file="ignite-cdc.log")
+
+        ignite.await_event("CountingCdcConsumer stopped",
+                           timeout_sec=1, from_the_beginning=True,
+                           log_file="ignite-cdc.log")
+
+        ignite.stop()
diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
index de791637368..891ac6e0b0c 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
@@ -88,7 +88,7 @@ class ConsistencyTest(IgniteTest):
 
             ignites.exec_command(node, f"sed -i 's/{orig}/{fixed}/g' {cfg_file}")
 
-        ignites.start()
+        ignites.start(clean=False)
 
         control_utility = ControlUtility(ignites)
 
diff --git a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
index 22f4fd9bdac..0173612ca17 100644
--- a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
@@ -87,7 +87,7 @@ class SnapshotTest(IgniteTest):
 
         nodes.stop()
         nodes.restore_from_snapshot(self.SNAPSHOT_NAME)
-        nodes.start()
+        nodes.start(clean=False)
 
         control_utility.activate()
         control_utility.validate_indexes()
diff --git a/modules/ducktests/tests/ignitetest/utils/bean.py b/modules/ducktests/tests/ignitetest/utils/bean.py
index 647019b0f13..f85a661a02f 100644
--- a/modules/ducktests/tests/ignitetest/utils/bean.py
+++ b/modules/ducktests/tests/ignitetest/utils/bean.py
@@ -48,3 +48,13 @@ class Bean:
 
     class_name: str
     properties: {}
+
+
+class BeanRef:
+    """
+    Helper class to represent property which is a bean reference.
+    """
+    def __init__(self, ref):
+        self.ref = ref
+
+    ref: str