You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by iv...@apache.org on 2023/01/24 12:30:20 UTC
[ignite] branch master updated: IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)
This is an automated email from the ASF dual-hosted git repository.
ivandasch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new e0e2edd5b02 IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)
e0e2edd5b02 is described below
commit e0e2edd5b029701ffbc18d01709da86bcd45405f
Author: Sergey Korotkov <se...@gmail.com>
AuthorDate: Tue Jan 24 19:30:10 2023 +0700
IGNITE-18387 [ducktests] Added basic test for the CDC (#10439)
---
.../ducktest/tests/cdc/CountingCdcConsumer.java | 91 ++++++++++++++++
.../ignitetest/services/utils/cdc/__init__.py | 18 +++
.../ignitetest/services/utils/cdc/ignite_cdc.py | 86 +++++++++++++++
.../ignitetest/services/utils/ducktests_service.py | 2 +-
.../ignitetest/services/utils/ignite_aware.py | 20 +---
.../utils/ignite_configuration/__init__.py | 9 +-
.../utils/ignite_configuration/data_storage.py | 3 +
.../tests/ignitetest/services/utils/ignite_spec.py | 2 +
.../tests/ignitetest/services/utils/jmx_utils.py | 2 +-
.../tests/ignitetest/services/utils/jvm_utils.py | 17 +++
.../ignitetest/services/utils/templates/bean.j2 | 22 ++++
...config.xml.j2 => client_configuration_macro.j2} | 7 +-
.../services/utils/templates/ignite.xml.j2 | 121 +--------------------
...ignite.xml.j2 => ignite_configuration_macro.j2} | 16 +--
.../services/utils/templates/misc_macro.j2 | 14 ++-
.../utils/templates/thin_client_config.xml.j2 | 28 ++---
.../tests/ignitetest/tests/cdc/__init__.py | 18 +++
.../tests/ignitetest/tests/cdc/cdc_test.py | 101 +++++++++++++++++
.../tests/control_utility/consistency_test.py | 2 +-
.../tests/ignitetest/tests/snapshot_test.py | 2 +-
modules/ducktests/tests/ignitetest/utils/bean.py | 10 ++
21 files changed, 408 insertions(+), 183 deletions(-)
diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java
new file mode 100644
index 00000000000..d6b173ae07a
--- /dev/null
+++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cdc/CountingCdcConsumer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cdc;
+
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryType;
+import org.apache.ignite.cdc.CdcCacheEvent;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.CdcEvent;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.resources.LoggerResource;
+
+/**
+ * Simple CDC consumer.
+ *
+ * Just counts the total number of the objects consumed.
+ */
+public class CountingCdcConsumer implements CdcConsumer {
+ /** Logger. */
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /** Total count of the object consumed. */
+ private final AtomicLong objectsConsumed = new AtomicLong();
+
+ /** {@inheritDoc} */
+ @Override public void start(MetricRegistry mreg) {
+ log.info("CountingCdcConsumer started");
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onEvents(Iterator<CdcEvent> events) {
+ events.forEachRemaining(this::consume);
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onTypes(Iterator<BinaryType> types) {
+ types.forEachRemaining(this::consume);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMappings(Iterator<TypeMapping> mappings) {
+ mappings.forEachRemaining(this::consume);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheChange(Iterator<CdcCacheEvent> cacheEvents) {
+ cacheEvents.forEachRemaining(this::consume);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onCacheDestroy(Iterator<Integer> caches) {
+ caches.forEachRemaining(this::consume);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop() {
+ log.info("CountingCdcConsumer stopped, objectsConsumed: " + objectsConsumed.get());
+ }
+
+ /**
+ * Consume object by counting it.
+ *
+ * @param object object passed from the CDC engine.
+ */
+ private void consume(Object object) {
+ objectsConsumed.incrementAndGet();
+
+ log.info("Consumed: [object=" + object + "]");
+ }
+}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py
new file mode 100644
index 00000000000..2e0217ffec0
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains utility classes for CDC.
+"""
diff --git a/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
new file mode 100644
index 00000000000..034c464a566
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/cdc/ignite_cdc.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+
+
+"""
+This module contains Ignite CDC utility (ignite-cdc.sh) wrapper.
+"""
+import signal
+
+from ignitetest.services.utils.ignite_spec import envs_to_exports
+from ignitetest.services.utils.jvm_utils import JvmProcessMixin
+
+
+class IgniteCdcUtility(JvmProcessMixin):
+ """
+ Ignite CDC utility (ignite-cdc.sh) wrapper.
+ """
+ BASE_COMMAND = "ignite-cdc.sh"
+ JAVA_CLASS = "org.apache.ignite.startup.cmdline.CdcCommandLineStartup"
+
+ def __init__(self, cluster):
+ self.cluster = cluster
+ self.logger = cluster.context.logger
+
+ def start(self):
+ """
+ Start ignite-cdc.sh on cluster nodes.
+ """
+ def __start(node):
+ self.logger.info(f"{self.__service_node_id(node)}: starting {self.BASE_COMMAND}")
+
+ raw_output = node.account.ssh_capture(
+ self.__form_cmd(f"{self.BASE_COMMAND} -v {self.cluster.config_file}"))
+
+ code, _ = self.__parse_output(raw_output)
+
+ self.logger.debug(f"{self.__service_node_id(node)}: {self.BASE_COMMAND} finished with exit code: {code}")
+
+ self.logger.info(f"{self.cluster.service_id}: starting {self.BASE_COMMAND} ...")
+
+ self.cluster.exec_on_nodes_async(self.cluster.nodes, __start, timeout_sec=1)
+
+ def stop(self, force_stop=False):
+ """
+ Stop ignite-cdc.sh on cluster nodes.
+ """
+ def __stop(node):
+ self.logger.info(f"{self.__service_node_id(node)}: stopping {self.BASE_COMMAND}")
+
+ pids = self.pids(node, self.JAVA_CLASS)
+
+ for pid in pids:
+ node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
+
+ self.logger.info(f"{self.cluster.service_id}: stopping {self.BASE_COMMAND} ...")
+
+ self.cluster.exec_on_nodes_async(self.cluster.nodes, __stop)
+
+ def __service_node_id(self, node):
+ return f"{self.cluster.service_id} node {self.cluster.idx(node)} on {node.account.hostname}"
+
+ def __form_cmd(self, cmd):
+ envs = self.cluster.spec.envs()
+
+ envs["CDC_JVM_OPTS"] = f"\"{' '.join(self.cluster.spec.jvm_opts)}\""
+
+ return f"{envs_to_exports(envs)} bash " + self.cluster.script(cmd)
+
+ @staticmethod
+ def __parse_output(raw_output):
+ exit_code = raw_output.channel_file.channel.recv_exit_status()
+ output = "".join(raw_output)
+
+ return exit_code, output
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py b/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
index 5da4b07b039..115df34902d 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ducktests_service.py
@@ -53,4 +53,4 @@ class DucktestsService(Service, metaclass=ABCMeta):
self.stop(force_stop=True)
def clean_node(self, node, **kwargs):
- assert self.stopped
+ pass
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
index 4fe44b29e00..ef5d05b4582 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_aware.py
@@ -38,12 +38,13 @@ from ignitetest.services.utils.background_thread import BackgroundThreadService
from ignitetest.services.utils.concurrent import CountDownLatch, AtomicValue
from ignitetest.services.utils.ignite_spec import resolve_spec, SHARED_PREPARED_FILE
from ignitetest.services.utils.jmx_utils import ignite_jmx_mixin, JmxClient
+from ignitetest.services.utils.jvm_utils import JvmProcessMixin
from ignitetest.services.utils.log_utils import monitor_log
from ignitetest.services.utils.path import IgnitePathAware
from ignitetest.utils.enum import constructible
-class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABCMeta):
+class IgniteAwareService(BackgroundThreadService, IgnitePathAware, JvmProcessMixin, metaclass=ABCMeta):
"""
The base class to build services aware of Ignite.
"""
@@ -150,7 +151,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
(str(node.account), self.shutdown_timeout_sec))
def stop_node(self, node, force_stop=False, **kwargs):
- pids = self.pids(node)
+ pids = self.pids(node, self.main_java_class)
for pid in pids:
node.account.signal(pid, signal.SIGKILL if force_stop else signal.SIGTERM, allow_fail=False)
@@ -221,15 +222,6 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
setattr(node, "consistent_id", node.account.externally_routable_ip)
- def pids(self, node):
- """
- :param node: Ignite service node.
- :return: List of service's pids.
- """
- cmd = "pgrep -ax java | awk '/%s/ {print $1}'" % self.main_java_class
-
- return [int(pid) for pid in node.account.ssh_capture(cmd, allow_fail=True)]
-
def worker(self, idx, node, **kwargs):
cmd = self.spec.command(node)
@@ -242,7 +234,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
:param node: Ignite service node.
:return: True if node is alive.
"""
- return len(self.pids(node)) > 0
+ return len(self.pids(node, self.main_java_class)) > 0
def await_event_on_node(self, evt_message, node, timeout_sec, from_the_beginning=False, backoff_sec=.1,
log_file=None):
@@ -525,7 +517,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
Generate thread dump on node.
:param node: Ignite service node.
"""
- for pid in self.pids(node):
+ for pid in self.pids(node, self.main_java_class):
try:
node.account.signal(pid, signal.SIGQUIT, allow_fail=True)
except RemoteCommandError:
@@ -550,7 +542,7 @@ class IgniteAwareService(BackgroundThreadService, IgnitePathAware, metaclass=ABC
snapshot_db = os.path.join(self.snapshots_dir, snapshot_name, "db")
for node in self.nodes:
- assert len(self.pids(node)) == 0
+ assert len(self.pids(node, self.main_java_class)) == 0
node.account.ssh(f'rm -rf {self.database_dir}', allow_fail=False)
node.account.ssh(f'cp -r {snapshot_db} {self.work_dir}', allow_fail=False)
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
index 0f8128c7840..a9c84f4d295 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/__init__.py
@@ -73,7 +73,7 @@ class IgniteConfiguration(NamedTuple):
auto_activation_enabled: bool = None
transaction_configuration: TransactionConfiguration = None
- def __prepare_ssl(self, test_globals, shared_root):
+ def prepare_ssl(self, test_globals, shared_root):
"""
Updates ssl configuration from globals.
"""
@@ -112,7 +112,7 @@ class IgniteConfiguration(NamedTuple):
"""
Updates configuration based on current environment.
"""
- return self.__prepare_ssl(cluster.globals, cluster.shared_root).__prepare_discovery(cluster, node)
+ return self.__prepare_discovery(cluster, node)
@property
def service_type(self):
@@ -138,8 +138,9 @@ class IgniteThinClientConfiguration(NamedTuple):
ssl_params: SslParams = None
username: str = None
password: str = None
+ ext_beans: list = []
- def __prepare_ssl(self, test_globals, shared_root):
+ def prepare_ssl(self, test_globals, shared_root):
"""
Updates ssl configuration from globals.
"""
@@ -154,7 +155,7 @@ class IgniteThinClientConfiguration(NamedTuple):
"""
Updates configuration based on current environment.
"""
- return self.__prepare_ssl(cluster.globals, cluster.shared_root)
+ return self
@property
def service_type(self):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
index d797035a277..0f0811bdf77 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_configuration/data_storage.py
@@ -24,6 +24,7 @@ class DataRegionConfiguration(NamedTuple):
"""
Ignite DataRegion Configuration
"""
+ cdc_enabled: bool = False
name: str = "default"
persistence_enabled: bool = False
initial_size: int = 100 * 1024 * 1024
@@ -36,6 +37,7 @@ class DataStorageConfiguration(NamedTuple):
"""
Ignite DataStorage configuration
"""
+ cdc_wal_path: str = None
checkpoint_frequency: int = None
default: DataRegionConfiguration = DataRegionConfiguration()
max_wal_archive_size: int = None
@@ -44,6 +46,7 @@ class DataStorageConfiguration(NamedTuple):
regions: list = []
wal_buffer_size: int = None
wal_compaction_enabled: bool = None
+ wal_force_archive_timeout: int = None
wal_history_size: int = None
wal_mode: str = None
wal_segment_size: int = None
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 2921e597422..b82551491dc 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -158,6 +158,8 @@ class IgniteSpec(metaclass=ABCMeta):
is_jmx_metrics_enabled(self.service)):
config = config._replace(ignite_instance_name=self._test_id)
+ config = config.prepare_ssl(self.service.globals, self.service.shared_root)
+
return config
@property
diff --git a/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
index 61e50bcaadc..462e513ac02 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/jmx_utils.py
@@ -29,7 +29,7 @@ def ignite_jmx_mixin(node, service):
:param node: Ignite service node.
:param service: Ignite service.
"""
- setattr(node, 'pids', service.pids(node))
+ setattr(node, 'pids', service.pids(node, service.main_java_class))
setattr(node, 'install_root', service.install_root)
base_cls = node.__class__
base_cls_name = node.__class__.__name__
diff --git a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
index 51ea81b5aea..0fccd871a75 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/jvm_utils.py
@@ -96,3 +96,20 @@ def _remove_duplicates(params: dict):
del params[param_key]
else:
duplicates[dup_key] = True
+
+
+class JvmProcessMixin:
+ """
+ Mixin to work with JVM processes
+ """
+ @staticmethod
+ def pids(node, java_class):
+ """
+ Return pids of jvm processes running this java class on service node.
+ :param node: Service node.
+ :param java_class: Java class name
+ :return: List of service's pids.
+ """
+ cmd = "pgrep -ax java | awk '/%s/ {print $1}'" % java_class
+
+ return [int(pid) for pid in node.account.ssh_capture(cmd, allow_fail=True)]
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2
new file mode 100644
index 00000000000..d2e573a2621
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/bean.j2
@@ -0,0 +1,22 @@
+{#
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+#}
+
+{% import 'misc_macro.j2' as misc_utils %}
+
+{% macro apply(config, bean_object) %}
+ {{ misc_utils.bean(bean_object) }}
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
similarity index 85%
copy from modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
copy to modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
index 7ec955b5c1b..7e034bfed9f 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/client_configuration_macro.j2
@@ -17,10 +17,7 @@
{% import 'ssl_params_macro.j2' as ssl_params_util %}
-<?xml version="1.0" encoding="UTF-8"?>
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
+{% macro apply(config, service) %}
<bean class="org.apache.ignite.configuration.ClientConfiguration" id="thin.client.cfg">
<property name="addresses">
<list>
@@ -41,4 +38,4 @@
<property name="sslTrustCertificateKeyStorePassword" value="{{ config.ssl_params.trust_store_password }}"/>
{% endif %}
</bean>
-</beans>
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
index a63fcc6ff3b..847694b3579 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
@@ -17,133 +17,16 @@
limitations under the License.
#}
-{% import 'communication_macro.j2' as communication %}
-{% import 'discovery_macro.j2' as disco_utils %}
-{% import 'cache_macro.j2' as cache_utils %}
-{% import 'datastorage_macro.j2' as datastorage_utils %}
+{% import 'ignite_configuration_macro.j2' as ignite_configuration %}
{% import 'misc_macro.j2' as misc_utils %}
-{% import 'ssl_params_macro.j2' as ssl_params_util %}
-{% import 'connector_configuration.j2' as connector_configuration_util %}
-{% import 'client_connector_configuration.j2' as client_connector_configuration_util %}
-{% import 'transaction_macro.j2' as transaction_util %}
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
- <bean class="org.apache.ignite.configuration.IgniteConfiguration">
- <property name="workDirectory" value="{{ service.work_dir }}" />
- <property name="gridLogger">
- <bean class="org.apache.ignite.logger.log4j2.Log4J2Logger">
- <constructor-arg type="java.lang.String" value="{{ service.config_dir }}/{{ config.log4j_config }}"/>
- </bean>
- </property>
- <property name="clientMode" value="{{ config.client_mode or False | lower }}"/>
- <property name="consistentId" value="{{ config.consistent_id }}"/>
- <property name="igniteInstanceName" value="{{ config.ignite_instance_name }}"/>
- <property name="failureDetectionTimeout" value="{{ config.failure_detection_timeout }}"/>
- <property name="systemWorkerBlockedTimeout" value="{{ config.sys_worker_blocked_timeout }}"/>
- <property name="peerClassLoadingEnabled" value="{{ config.peer_class_loading_enabled }}"/>
- <property name="authenticationEnabled" value="{{ config.auth_enabled | lower }}"/>
- {% if config.rebalance_thread_pool_size %}
- <property name="rebalanceThreadPoolSize" value="{{ config.rebalance_thread_pool_size }}"/>
- {% endif %}
- {% if config.rebalance_batch_size %}
- <property name="rebalanceBatchSize" value="{{ config.rebalance_batch_size }}"/>
- {% endif %}
- {% if config.rebalance_batches_prefetch_count %}
- <property name="rebalanceBatchesPrefetchCount" value="{{ config.rebalance_batches_prefetch_count }}"/>
- {% endif %}
- {% if config.rebalance_throttle %}
- <property name="rebalanceThrottle" value="{{ config.rebalance_throttle }}"/>
- {% endif %}
-
- {% if config.metrics_log_frequency %}
- <property name="metricsLogFrequency" value="{{ config.metrics_log_frequency }}"/>
- {% endif %}
-
- {% if config.metrics_update_frequency %}
- <property name="metricsUpdateFrequency" value="{{ config.metrics_update_frequency }}"/>
- {% endif %}
-
- {% if config.auto_activation_enabled %}
- <property name="autoActivationEnabled" value="{{ config.auto_activation_enabled }}"/>
- {% endif %}
-
- {% if config.metric_exporters | length > 0 %}
- <property name="metricExporterSpi">
- <list>
- {% for exporter in config.metric_exporters %}
- {{ misc_utils.bean(exporter) }}
- {% endfor %}
- </list>
- </property>
- {% endif %}
-
- {{ misc_utils.cluster_state(config.cluster_state, config.version) }}
-
- {{ communication.communication_spi(config.communication_spi) }}
-
- {{ disco_utils.discovery_spi(config.discovery_spi) }}
-
- {{ datastorage_utils.data_storage(config.data_storage) }}
-
- {{ cache_utils.cache_configs(config.caches) }}
-
- {% if config.local_host %}
- <property name="localHost" value="{{ config.local_host }}"/>
- {% endif %}
-
- {% if config.properties %}
- {{ config.properties }}
- {% endif %}
-
- {% if config.ssl_params %}
- <property name="sslContextFactory">
- {{ ssl_params_util.ssl_params(config.ssl_params) }}
- </property>
- {% endif %}
-
- {{ connector_configuration_util.connector_configuration(config.connector_configuration) }}
-
- {{ client_connector_configuration_util.client_connector_configuration(config.client_connector_configuration) }}
-
- {{ transaction_util.transaction(config.transaction_configuration) }}
-
- {% if config.binary_configuration %}
- <property name="binaryConfiguration">
- <bean class="org.apache.ignite.configuration.BinaryConfiguration">
- <property name="compactFooter" value="{{ config.binary_configuration.compact_footer }}"/>
- </bean>
- </property>
- {% endif %}
-
- {% if config.local_event_listeners %}
- <property name="localEventListeners" ref="{{ config.local_event_listeners }}"/>
- {% endif %}
-
- {% if config.include_event_types | length > 0 %}
- <property name="includeEventTypes" ref="eventTypes"/>
- {% endif %}
-
- {% if config.event_storage_spi %}
- <property name="eventStorageSpi" ref="{{ config.event_storage_spi }}"/>
- {% endif %}
-
- {% if config.sql_schemas | length > 0 %}
- <property name="sqlSchemas">
- <list>
- {% for schema in config.sql_schemas %}
- <value>{{ schema }}</value>
- {% endfor %}
- </list>
- </property>
- {% endif %}
-
- {{ misc_utils.plugins(config) }}
- </bean>
+ {{ ignite_configuration.apply(config, service) }}
{{ misc_utils.ext_beans(config) }}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
similarity index 90%
copy from modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
copy to modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
index a63fcc6ff3b..d5f7b88012c 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/ignite.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/ignite_configuration_macro.j2
@@ -1,5 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
{#
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@@ -27,11 +25,7 @@
{% import 'client_connector_configuration.j2' as client_connector_configuration_util %}
{% import 'transaction_macro.j2' as transaction_util %}
-<beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
- http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
- http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+{% macro apply(config, service) %}
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="workDirectory" value="{{ service.work_dir }}" />
<property name="gridLogger">
@@ -144,10 +138,4 @@
{{ misc_utils.plugins(config) }}
</bean>
-
- {{ misc_utils.ext_beans(config) }}
-
- {% if config.include_event_types | length > 0 %}
- {{ misc_utils.event_types(config) }}
- {% endif %}
-</beans>
+{% endmacro %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
index 1b69ec7534a..4c78f90ca23 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/misc_macro.j2
@@ -60,9 +60,19 @@
<bean class="{{ bean_object }}">
{% for name, value in bean_object.properties.items() %}
{% if value is string or value is number or value is boolean %}
- <property name="{{ name }}" value="{{ value }}"/>
+ <property name="{{ name | snake_to_camel }}" value="{{ value }}"/>
+ {% elif value is iterable and value is not mapping %}
+ <property name="{{ name | snake_to_camel }}">
+ <list>
+ {% for v in value %}
+ <value>{{ v }}</value>
+ {% endfor %}
+ </list>
+ </property>
+ {% elif value.ref | length > 0 %}
+ <property name="{{ name | snake_to_camel }}" ref="{{ value.ref }}"/>
{% else %}
- <property name="{{ name }}">
+ <property name="{{ name | snake_to_camel }}">
{{ bean(value) }}
</property>
{% endif %}
diff --git a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2 b/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
index 7ec955b5c1b..527d34f9ae0 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
+++ b/modules/ducktests/tests/ignitetest/services/utils/templates/thin_client_config.xml.j2
@@ -15,30 +15,16 @@
limitations under the License.
#}
-{% import 'ssl_params_macro.j2' as ssl_params_util %}
+{% import 'client_configuration_macro.j2' as client_configuration %}
+{% import 'misc_macro.j2' as misc_utils %}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
- <bean class="org.apache.ignite.configuration.ClientConfiguration" id="thin.client.cfg">
- <property name="addresses">
- <list>
- <value>{{ config.addresses }}</value>
- </list>
- </property>
+ xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+ {{ client_configuration.apply(config, service) }}
- {% if config.username %}
- <property name="userName" value="{{ config.username }}"/>
- <property name="userPassword" value="{{ config.password }}"/>
- {% endif %}
-
- {% if config.ssl_params %}
- <property name="sslMode" value="REQUIRED"/>
- <property name="sslClientCertificateKeyStorePath" value="{{ config.ssl_params.key_store_path }}"/>
- <property name="sslClientCertificateKeyStorePassword" value="{{ config.ssl_params.key_store_password }}"/>
- <property name="sslTrustCertificateKeyStorePath" value="{{ config.ssl_params.trust_store_path }}"/>
- <property name="sslTrustCertificateKeyStorePassword" value="{{ config.ssl_params.trust_store_password }}"/>
- {% endif %}
- </bean>
+ {{ misc_utils.ext_beans(config) }}
</beans>
diff --git a/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py b/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py
new file mode 100644
index 00000000000..34b78cf8898
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/__init__.py
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This package contains CDC tests.
+"""
diff --git a/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py
new file mode 100644
index 00000000000..4a1cc9b7381
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/cdc/cdc_test.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License
+import time
+
+from ducktape.mark import parametrize
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.cdc.ignite_cdc import IgniteCdcUtility
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, DataStorageConfiguration
+from ignitetest.services.utils.ignite_configuration.data_storage import DataRegionConfiguration
+from ignitetest.tests.client_test import check_topology
+from ignitetest.utils import cluster, ignite_versions, ignore_if
+from ignitetest.utils.bean import Bean
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import LATEST, IgniteVersion, DEV_BRANCH, V_2_14_0
+
+
+class CdcTest(IgniteTest):
+ """
+ CDC tests.
+ """
+ CACHE_NAME = "cdc-test-cache"
+ JAVA_CLIENT_CLASS_NAME = "org.apache.ignite.internal.ducktest.tests.client_test.IgniteCachePutClient"
+
+ @cluster(num_nodes=5)
+ @ignite_versions(str(DEV_BRANCH), str(LATEST))
+ @ignore_if(lambda version, _: version <= V_2_14_0)
+ @parametrize(num_nodes=5, wal_force_archive_timeout=100, pacing=10, duration_sec=10)
+ def test_cdc_start_stop(self, ignite_version, num_nodes, wal_force_archive_timeout, pacing, duration_sec):
+ """
+ Test for starting and stopping the ignite-cdc.sh
+ """
+ cdc_configuration = Bean("org.apache.ignite.cdc.CdcConfiguration",
+ consumer=Bean("org.apache.ignite.internal.ducktest.tests.cdc.CountingCdcConsumer"))
+
+ config = IgniteConfiguration(
+ version=IgniteVersion(ignite_version),
+ data_storage=DataStorageConfiguration(
+ wal_force_archive_timeout=wal_force_archive_timeout,
+ cdc_wal_path="cdc-custom-wal-dir",
+ default=DataRegionConfiguration(
+ persistence_enabled=True,
+ cdc_enabled=True
+ )
+ ),
+ ext_beans=[('bean.j2', cdc_configuration)]
+ )
+
+ ignite = IgniteService(self.test_context, config=config, num_nodes=num_nodes-2)
+ ignite.start()
+
+ control_sh = ControlUtility(cluster=ignite)
+ control_sh.activate()
+
+ ignite_cdc = IgniteCdcUtility(ignite)
+ ignite_cdc.start()
+
+ ignite.await_event("CountingCdcConsumer started", timeout_sec=60, from_the_beginning=True,
+ log_file="ignite-cdc.log")
+
+ client_cfg = config._replace(client_mode=True)
+ client = IgniteApplicationService(self.test_context, client_cfg,
+ java_class_name=self.JAVA_CLIENT_CLASS_NAME,
+ num_nodes=2,
+ params={"cacheName": self.CACHE_NAME, "pacing": pacing})
+
+ client.start()
+
+ check_topology(control_sh, num_nodes)
+
+ ignite.await_event("Consumed", timeout_sec=duration_sec, from_the_beginning=True,
+ log_file="ignite-cdc.log")
+
+ time.sleep(duration_sec)
+
+ client.stop()
+
+ ignite_cdc.stop()
+
+ ignite.await_event("Stopping Change Data Capture service instance",
+ timeout_sec=120, from_the_beginning=True,
+ log_file="ignite-cdc.log")
+
+ ignite.await_event("CountingCdcConsumer stopped",
+ timeout_sec=1, from_the_beginning=True,
+ log_file="ignite-cdc.log")
+
+ ignite.stop()
diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
index de791637368..891ac6e0b0c 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/consistency_test.py
@@ -88,7 +88,7 @@ class ConsistencyTest(IgniteTest):
ignites.exec_command(node, f"sed -i 's/{orig}/{fixed}/g' {cfg_file}")
- ignites.start()
+ ignites.start(clean=False)
control_utility = ControlUtility(ignites)
diff --git a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
index 22f4fd9bdac..0173612ca17 100644
--- a/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/snapshot_test.py
@@ -87,7 +87,7 @@ class SnapshotTest(IgniteTest):
nodes.stop()
nodes.restore_from_snapshot(self.SNAPSHOT_NAME)
- nodes.start()
+ nodes.start(clean=False)
control_utility.activate()
control_utility.validate_indexes()
diff --git a/modules/ducktests/tests/ignitetest/utils/bean.py b/modules/ducktests/tests/ignitetest/utils/bean.py
index 647019b0f13..f85a661a02f 100644
--- a/modules/ducktests/tests/ignitetest/utils/bean.py
+++ b/modules/ducktests/tests/ignitetest/utils/bean.py
@@ -48,3 +48,13 @@ class Bean:
class_name: str
properties: {}
+
+
+class BeanRef:
+ """
+ Helper class to represent property which is a bean reference.
+ """
+ def __init__(self, ref):
+ self.ref = ref
+
+ ref: str