You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/10/07 12:27:18 UTC

[ignite-extensions] branch master updated: IGNITE-14355 CDC Consumer metrics added (#73)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 8053e3d  IGNITE-14355 CDC Consumer metrics added (#73)
8053e3d is described below

commit 8053e3d8844deff2a40a54339171f90ffb16834f
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Oct 7 15:27:12 2021 +0300

    IGNITE-14355 CDC Consumer metrics added (#73)
---
 modules/cdc-ext/pom.xml                            |  1 -
 .../org/apache/ignite/cdc/CdcEventsApplier.java    | 32 +++++----
 .../ignite/cdc/IgniteToIgniteCdcStreamer.java      | 36 ++++++++--
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 56 +++++++++++----
 .../apache/ignite/cdc/AbstractReplicationTest.java | 83 ++++++++++++++++++++++
 .../cdc/CdcIgniteToIgniteReplicationTest.java      | 15 +++-
 .../cdc/kafka/CdcKafkaReplicationAppsTest.java     |  5 ++
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  | 17 ++++-
 8 files changed, 213 insertions(+), 32 deletions(-)

diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index f85b1b8..151acaa 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -32,7 +32,6 @@
 
     <properties>
         <kafka.version>2.7.0</kafka.version>
-        <test.containers.version>1.15.1</test.containers.version>
         <slf4j.version>1.7.30</slf4j.version>
     </properties>
 
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 9137f2a..777b992 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -19,7 +19,6 @@ package org.apache.ignite.cdc;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
@@ -58,9 +57,6 @@ public abstract class CdcEventsApplier {
     /** */
     private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
 
-    /** */
-    protected final AtomicLong evtsApplied = new AtomicLong();
-
     /**
      * @param maxBatchSize Maximum batch size.
      */
@@ -70,11 +66,14 @@ public abstract class CdcEventsApplier {
 
     /**
      * @param evts Events to process.
+     * @return Number of applied events.
      * @throws IgniteCheckedException If failed.
      */
-    protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+    protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
         IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
 
+        int evtsApplied = 0;
+
         for (CdcEvent evt : evts) {
             if (log().isDebugEnabled())
                 log().debug("Event received [key=" + evt.key() + ']');
@@ -93,7 +92,7 @@ public abstract class CdcEventsApplier {
             });
 
             if (cache != currCache) {
-                applyIf(currCache, hasUpdates, hasRemoves);
+                evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
 
                 currCache = cache;
             }
@@ -103,7 +102,7 @@ public abstract class CdcEventsApplier {
             KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
 
             if (evt.value() != null) {
-                applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
+                evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
 
                 CacheObject val;
 
@@ -116,17 +115,17 @@ public abstract class CdcEventsApplier {
                     new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
             }
             else {
-                applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+                evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
 
                 rmvBatch.put(key,
                     new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
             }
-
-            evtsApplied.incrementAndGet();
         }
 
         if (currCache != null)
-            applyIf(currCache, hasUpdates, hasRemoves);
+            evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
+
+        return evtsApplied;
     }
 
     /**
@@ -135,19 +134,24 @@ public abstract class CdcEventsApplier {
      * @param cache Current cache.
      * @param applyUpd Apply update batch flag supplier.
      * @param applyRmv Apply remove batch flag supplier.
+     * @return Number of applied events.
      * @throws IgniteCheckedException In case of error.
      */
-    private void applyIf(
+    private int applyIf(
         IgniteInternalCache<BinaryObject, BinaryObject> cache,
         BooleanSupplier applyUpd,
         BooleanSupplier applyRmv
     ) throws IgniteCheckedException {
+        int evtsApplied = 0;
+
         if (applyUpd.getAsBoolean()) {
             if (log().isDebugEnabled())
                 log().debug("Applying put batch [cache=" + cache.name() + ']');
 
             cache.putAllConflict(updBatch);
 
+            evtsApplied += updBatch.size();
+
             updBatch.clear();
         }
 
@@ -157,8 +161,12 @@ public abstract class CdcEventsApplier {
 
             cache.removeAllConflict(rmvBatch);
 
+            evtsApplied += rmvBatch.size();
+
             rmvBatch.clear();
         }
+
+        return evtsApplied;
     }
 
     /** @return {@code True} if update batch should be applied. */
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index b8840bb..f53c66a 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -29,6 +29,8 @@ import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.resources.LoggerResource;
@@ -48,6 +50,18 @@ import org.apache.ignite.resources.LoggerResource;
  * @see CacheVersionConflictResolverImpl
  */
 public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
+    /** */
+    public static final String EVTS_CNT = "EventsCount";
+
+    /** */
+    public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
+
+    /** */
+    public static final String LAST_EVT_TIME = "LastEventTime";
+
+    /** */
+    public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
+
     /** Destination cluster client configuration. */
     private final IgniteConfiguration destIgniteCfg;
 
@@ -57,6 +71,12 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
     /** Destination Ignite cluster client */
     private IgniteEx dest;
 
+    /** Timestamp of last sent message. */
+    private AtomicLongMetric lastEvtTs;
+
+    /** Count of events applied to destination cluster. */
+    protected AtomicLongMetric evtsCnt;
+
     /** Logger. */
     @LoggerResource
     private IgniteLogger log;
@@ -83,17 +103,20 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
     }
 
     /** {@inheritDoc} */
-    @Override public void start() {
+    @Override public void start(MetricRegistry mreg) {
         if (log.isInfoEnabled())
             log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
 
         dest = (IgniteEx)Ignition.start(destIgniteCfg);
+
+        this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+        this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
     }
 
     /** {@inheritDoc} */
     @Override public boolean onEvents(Iterator<CdcEvent> evts) {
         try {
-            apply(() -> F.iterator(
+            long msgsSnt = apply(() -> F.iterator(
                 evts,
                 F.identity(),
                 true,
@@ -101,8 +124,13 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
                 evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
                 evt -> evt.version().otherClusterVersion() == null));
 
-            if (log.isInfoEnabled())
-                log.info("Events applied [evtsApplied=" + evtsApplied.get() + ']');
+            if (msgsSnt > 0) {
+                evtsCnt.add(msgsSnt);
+                lastEvtTs.value(System.currentTimeMillis());
+
+                if (log.isInfoEnabled())
+                    log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
+            }
 
             return true;
         }
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 3f97ee8..af96169 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -32,6 +32,8 @@ import org.apache.ignite.cdc.CdcConsumer;
 import org.apache.ignite.cdc.CdcEvent;
 import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.resources.LoggerResource;
@@ -41,6 +43,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
 import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
 
@@ -64,6 +70,12 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     /** Default kafka request timeout in seconds. */
     public static final int DFLT_REQ_TIMEOUT = 5;
 
+    /** Bytes sent metric name. */
+    public static final String BYTES_SENT = "BytesSent";
+
+    /** Bytes sent metric description. */
+    public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
+
     /** Log. */
     @LoggerResource
     private IgniteLogger log;
@@ -89,8 +101,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
     /** Max batch size. */
     private final int maxBatchSize;
 
+    /** Timestamp of last sent message. */
+    private AtomicLongMetric lastMsgTs;
+
+    /** Count of bytes sent to the Kafka. */
+    private AtomicLongMetric bytesSnt;
+
     /** Count of sent messages.  */
-    private long msgCnt;
+    private AtomicLongMetric msgsSnt;
 
     /**
      * @param topic Topic name.
@@ -158,35 +176,43 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
                 continue;
             }
 
-            msgCnt++;
+            byte[] bytes = IgniteUtils.toBytes(evt);
+
+            bytesSnt.add(bytes.length);
 
             futs.add(producer.send(new ProducerRecord<>(
                 topic,
                 evt.partition() % kafkaParts,
                 evt.cacheId(),
-                IgniteUtils.toBytes(evt)
+                bytes
             )));
 
             if (log.isDebugEnabled())
                 log.debug("Event sent asynchronously [evt=" + evt + ']');
         }
 
-        try {
-            for (Future<RecordMetadata> fut : futs)
-                fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
-        }
-        catch (InterruptedException | ExecutionException | TimeoutException e) {
-            throw new RuntimeException(e);
-        }
+        if (!futs.isEmpty()) {
+            try {
+                for (Future<RecordMetadata> fut : futs)
+                    fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+
+                msgsSnt.add(futs.size());
+
+                lastMsgTs.value(System.currentTimeMillis());
+            }
+            catch (InterruptedException | ExecutionException | TimeoutException e) {
+                throw new RuntimeException(e);
+            }
 
-        if (log.isInfoEnabled())
-            log.info("Events processed [sentMessagesCount=" + msgCnt + ']');
+            if (log.isInfoEnabled())
+                log.info("Events processed [sentMessagesCount=" + msgsSnt.value() + ']');
+        }
 
         return true;
     }
 
     /** {@inheritDoc} */
-    @Override public void start() {
+    @Override public void start(MetricRegistry mreg) {
         try {
             producer = new KafkaProducer<>(kafkaProps);
 
@@ -196,6 +222,10 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
         catch (Exception e) {
             throw new RuntimeException(e);
         }
+
+        this.msgsSnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+        this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
+        this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 7e46dc3..6db6eb3 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.cdc;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -26,7 +27,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.function.Function;
 import java.util.stream.IntStream;
+import javax.management.DynamicMBean;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
 import org.apache.ignite.cache.CacheAtomicityMode;
 import org.apache.ignite.cache.CacheMode;
 import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -38,11 +41,15 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
 import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteBiTuple;
 import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
 import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ObjectMetric;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -53,7 +60,18 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
 import static org.apache.ignite.cache.CacheMode.PARTITIONED;
 import static org.apache.ignite.cache.CacheMode.REPLICATED;
 import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.cdc.CdcMain.BINARY_META_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
+import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
 import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
 
@@ -137,6 +155,9 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
     /** */
     private byte clusterId = SRC_CLUSTER_ID;
 
+    /** */
+    protected final List<CdcMain> cdcs = Collections.synchronizedList(new ArrayList<>());
+
     /** {@inheritDoc} */
     @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
@@ -179,6 +200,8 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
     @Override protected void beforeTest() throws Exception {
         cleanPersistenceDir();
 
+        cdcs.clear();
+
         IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> cluster = setupCluster("source", "src-cluster-client", 0);
 
         srcCluster = cluster.get1();
@@ -249,10 +272,14 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
 
             waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
 
+            checkMetrics();
+
             IntStream.range(0, KEYS_CNT).forEach(srcCache::remove);
 
             waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
 
+            checkMetrics();
+
             assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE));
         }
         finally {
@@ -287,6 +314,7 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
 
             assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
 
+            checkMetrics();
 
             List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");
 
@@ -298,6 +326,8 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
             executeSql(srcCluster[0], deleteQry);
 
             assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+
+            checkMetrics();
         }
         finally {
             for (IgniteInternalFuture<?> fut : futs)
@@ -408,4 +438,57 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
 
     /** */
     protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
+
+    /** */
+    protected abstract void checkConsumerMetrics(Function<String, Long> longMetric);
+
+    /** */
+    protected void checkMetrics() {
+        for (int i = 0; i < cdcs.size(); i++) {
+            IgniteConfiguration cfg = getFieldValue(cdcs.get(i), "igniteCfg");
+
+            MetricRegistry mreg = getFieldValue(cdcs.get(i), "mreg");
+
+            assertNotNull(mreg);
+
+            checkMetrics(
+                m -> mreg.<LongMetric>findMetric(m).value(),
+                m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+            );
+
+            Function<DynamicMBean, Function<String, ?>> jmxVal = mxBean -> m -> {
+                try {
+                    return mxBean.getAttribute(m);
+                }
+                catch (Exception e) {
+                    throw new IgniteException(e);
+                }
+            };
+
+            DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+            checkMetrics((Function<String, Long>)jmxVal.apply(jmxCdcReg), (Function<String, String>)jmxVal.apply(jmxCdcReg));
+
+            DynamicMBean jmxConsumerReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), "cdc", "consumer");
+
+            checkConsumerMetrics((Function<String, Long>)jmxVal.apply(jmxConsumerReg));
+        }
+    }
+
+    /** */
+    private void checkMetrics(Function<String, Long> longMetric, Function<String, String> strMetric) {
+        long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+        long curSegIdx = longMetric.apply(CUR_SEG_IDX);
+
+        assertTrue(committedSegIdx <= curSegIdx);
+
+        assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+        assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+        for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, CDC_DIR})
+            assertTrue(new File(strMetric.apply(m)).exists());
+
+        assertNotNull(longMetric.apply(LAST_EVT_TIME));
+        assertNotNull(longMetric.apply(EVTS_CNT));
+    }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 6f87f9d..5a9b5b8 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.cdc;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Function;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 
 import static org.apache.ignite.testframework.GridTestUtils.runAsync;
 
@@ -51,6 +53,12 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
         return futs;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+    }
+
     /**
      * @param srcCfg Ignite source node configuration.
      * @param destCfg Ignite destination cluster configuration.
@@ -62,8 +70,13 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
             cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+            cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+            CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
+
+            cdcs.add(cdc);
 
-            new CdcMain(srcCfg, null, cdcCfg).run();
+            cdc.run();
         });
     }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 459c229..bf51d4f 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -135,6 +135,11 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
         );
     }
 
+    /** {@inheritDoc} */
+    @Override protected void checkMetrics() {
+        // Skip metrics check.
+    }
+
     /** */
     private String prepareConfig(String path, Map<String, String> params) {
         try {
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index c492300..ce51a7b 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -22,12 +22,15 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.function.Function;
 import org.apache.ignite.cdc.AbstractReplicationTest;
 import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 
@@ -116,6 +119,13 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
         return futs;
     }
 
+    /** {@inheritDoc} */
+    @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+        assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+        assertNotNull(longMetric.apply(IgniteToKafkaCdcStreamer.BYTES_SENT));
+    }
+
     /**
      * @param igniteCfg Ignite configuration.
      * @param topic Kafka topic name.
@@ -130,8 +140,13 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
             CdcConfiguration cdcCfg = new CdcConfiguration();
 
             cdcCfg.setConsumer(cdcCnsmr);
+            cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+            CdcMain cdc = new CdcMain(igniteCfg, null, cdcCfg);
+
+            cdcs.add(cdc);
 
-            new CdcMain(igniteCfg, null, cdcCfg).run();
+            cdc.run();
         });
     }