You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2021/10/07 12:27:18 UTC
[ignite-extensions] branch master updated: IGNITE-14355 CDC
Consumer metrics added (#73)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 8053e3d IGNITE-14355 CDC Consumer metrics added (#73)
8053e3d is described below
commit 8053e3d8844deff2a40a54339171f90ffb16834f
Author: Nikolay <ni...@apache.org>
AuthorDate: Thu Oct 7 15:27:12 2021 +0300
IGNITE-14355 CDC Consumer metrics added (#73)
---
modules/cdc-ext/pom.xml | 1 -
.../org/apache/ignite/cdc/CdcEventsApplier.java | 32 +++++----
.../ignite/cdc/IgniteToIgniteCdcStreamer.java | 36 ++++++++--
.../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 56 +++++++++++----
.../apache/ignite/cdc/AbstractReplicationTest.java | 83 ++++++++++++++++++++++
.../cdc/CdcIgniteToIgniteReplicationTest.java | 15 +++-
.../cdc/kafka/CdcKafkaReplicationAppsTest.java | 5 ++
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 17 ++++-
8 files changed, 213 insertions(+), 32 deletions(-)
diff --git a/modules/cdc-ext/pom.xml b/modules/cdc-ext/pom.xml
index f85b1b8..151acaa 100644
--- a/modules/cdc-ext/pom.xml
+++ b/modules/cdc-ext/pom.xml
@@ -32,7 +32,6 @@
<properties>
<kafka.version>2.7.0</kafka.version>
- <test.containers.version>1.15.1</test.containers.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
index 9137f2a..777b992 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcEventsApplier.java
@@ -19,7 +19,6 @@ package org.apache.ignite.cdc;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
@@ -58,9 +57,6 @@ public abstract class CdcEventsApplier {
/** */
private final BooleanSupplier hasRemoves = () -> !F.isEmpty(rmvBatch);
- /** */
- protected final AtomicLong evtsApplied = new AtomicLong();
-
/**
* @param maxBatchSize Maximum batch size.
*/
@@ -70,11 +66,14 @@ public abstract class CdcEventsApplier {
/**
* @param evts Events to process.
+ * @return Number of applied events.
* @throws IgniteCheckedException If failed.
*/
- protected void apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
+ protected int apply(Iterable<CdcEvent> evts) throws IgniteCheckedException {
IgniteInternalCache<BinaryObject, BinaryObject> currCache = null;
+ int evtsApplied = 0;
+
for (CdcEvent evt : evts) {
if (log().isDebugEnabled())
log().debug("Event received [key=" + evt.key() + ']');
@@ -93,7 +92,7 @@ public abstract class CdcEventsApplier {
});
if (cache != currCache) {
- applyIf(currCache, hasUpdates, hasRemoves);
+ evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
currCache = cache;
}
@@ -103,7 +102,7 @@ public abstract class CdcEventsApplier {
KeyCacheObject key = new KeyCacheObjectImpl(evt.key(), null, evt.partition());
if (evt.value() != null) {
- applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
+ evtsApplied += applyIf(currCache, () -> isApplyBatch(updBatch, key), hasRemoves);
CacheObject val;
@@ -116,17 +115,17 @@ public abstract class CdcEventsApplier {
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId())));
}
else {
- applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
+ evtsApplied += applyIf(currCache, hasUpdates, () -> isApplyBatch(rmvBatch, key));
rmvBatch.put(key,
new GridCacheVersion(order.topologyVersion(), order.order(), order.nodeOrder(), order.clusterId()));
}
-
- evtsApplied.incrementAndGet();
}
if (currCache != null)
- applyIf(currCache, hasUpdates, hasRemoves);
+ evtsApplied += applyIf(currCache, hasUpdates, hasRemoves);
+
+ return evtsApplied;
}
/**
@@ -135,19 +134,24 @@ public abstract class CdcEventsApplier {
* @param cache Current cache.
* @param applyUpd Apply update batch flag supplier.
* @param applyRmv Apply remove batch flag supplier.
+ * @return Number of applied events.
* @throws IgniteCheckedException In case of error.
*/
- private void applyIf(
+ private int applyIf(
IgniteInternalCache<BinaryObject, BinaryObject> cache,
BooleanSupplier applyUpd,
BooleanSupplier applyRmv
) throws IgniteCheckedException {
+ int evtsApplied = 0;
+
if (applyUpd.getAsBoolean()) {
if (log().isDebugEnabled())
log().debug("Applying put batch [cache=" + cache.name() + ']');
cache.putAllConflict(updBatch);
+ evtsApplied += updBatch.size();
+
updBatch.clear();
}
@@ -157,8 +161,12 @@ public abstract class CdcEventsApplier {
cache.removeAllConflict(rmvBatch);
+ evtsApplied += rmvBatch.size();
+
rmvBatch.clear();
}
+
+ return evtsApplied;
}
/** @return {@code True} if update batch should be applied. */
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
index b8840bb..f53c66a 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java
@@ -29,6 +29,8 @@ import org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamer;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.resources.LoggerResource;
@@ -48,6 +50,18 @@ import org.apache.ignite.resources.LoggerResource;
* @see CacheVersionConflictResolverImpl
*/
public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcConsumer {
+ /** */
+ public static final String EVTS_CNT = "EventsCount";
+
+ /** */
+ public static final String EVTS_CNT_DESC = "Count of messages applied to destination cluster";
+
+ /** */
+ public static final String LAST_EVT_TIME = "LastEventTime";
+
+ /** */
+ public static final String LAST_EVT_TIME_DESC = "Timestamp of last applied event";
+
/** Destination cluster client configuration. */
private final IgniteConfiguration destIgniteCfg;
@@ -57,6 +71,12 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
/** Destination Ignite cluster client */
private IgniteEx dest;
+ /** Timestamp of last sent message. */
+ private AtomicLongMetric lastEvtTs;
+
+ /** Count of events applied to destination cluster. */
+ protected AtomicLongMetric evtsCnt;
+
/** Logger. */
@LoggerResource
private IgniteLogger log;
@@ -83,17 +103,20 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
}
/** {@inheritDoc} */
- @Override public void start() {
+ @Override public void start(MetricRegistry mreg) {
if (log.isInfoEnabled())
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');
dest = (IgniteEx)Ignition.start(destIgniteCfg);
+
+ this.evtsCnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+ this.lastEvtTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
}
/** {@inheritDoc} */
@Override public boolean onEvents(Iterator<CdcEvent> evts) {
try {
- apply(() -> F.iterator(
+ long msgsSnt = apply(() -> F.iterator(
evts,
F.identity(),
true,
@@ -101,8 +124,13 @@ public class IgniteToIgniteCdcStreamer extends CdcEventsApplier implements CdcCo
evt -> F.isEmpty(cachesIds) || cachesIds.contains(evt.cacheId()),
evt -> evt.version().otherClusterVersion() == null));
- if (log.isInfoEnabled())
- log.info("Events applied [evtsApplied=" + evtsApplied.get() + ']');
+ if (msgsSnt > 0) {
+ evtsCnt.add(msgsSnt);
+ lastEvtTs.value(System.currentTimeMillis());
+
+ if (log.isInfoEnabled())
+ log.info("Events applied [evtsApplied=" + evtsCnt.value() + ']');
+ }
return true;
}
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index 3f97ee8..af96169 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -32,6 +32,8 @@ import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.conflictresolve.CacheVersionConflictResolverImpl;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.resources.LoggerResource;
@@ -41,6 +43,10 @@ import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.EVTS_CNT_DESC;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME;
+import static org.apache.ignite.cdc.IgniteToIgniteCdcStreamer.LAST_EVT_TIME_DESC;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
@@ -64,6 +70,12 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Default kafka request timeout in seconds. */
public static final int DFLT_REQ_TIMEOUT = 5;
+ /** Bytes sent metric name. */
+ public static final String BYTES_SENT = "BytesSent";
+
+ /** Bytes sent metric description. */
+ public static final String BYTES_SENT_DESCRIPTION = "Count of bytes sent.";
+
/** Log. */
@LoggerResource
private IgniteLogger log;
@@ -89,8 +101,14 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
/** Max batch size. */
private final int maxBatchSize;
+ /** Timestamp of last sent message. */
+ private AtomicLongMetric lastMsgTs;
+
+ /** Count of bytes sent to the Kafka. */
+ private AtomicLongMetric bytesSnt;
+
/** Count of sent messages. */
- private long msgCnt;
+ private AtomicLongMetric msgsSnt;
/**
* @param topic Topic name.
@@ -158,35 +176,43 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
continue;
}
- msgCnt++;
+ byte[] bytes = IgniteUtils.toBytes(evt);
+
+ bytesSnt.add(bytes.length);
futs.add(producer.send(new ProducerRecord<>(
topic,
evt.partition() % kafkaParts,
evt.cacheId(),
- IgniteUtils.toBytes(evt)
+ bytes
)));
if (log.isDebugEnabled())
log.debug("Event sent asynchronously [evt=" + evt + ']');
}
- try {
- for (Future<RecordMetadata> fut : futs)
- fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
- }
- catch (InterruptedException | ExecutionException | TimeoutException e) {
- throw new RuntimeException(e);
- }
+ if (!futs.isEmpty()) {
+ try {
+ for (Future<RecordMetadata> fut : futs)
+ fut.get(DFLT_REQ_TIMEOUT, TimeUnit.SECONDS);
+
+ msgsSnt.add(futs.size());
+
+ lastMsgTs.value(System.currentTimeMillis());
+ }
+ catch (InterruptedException | ExecutionException | TimeoutException e) {
+ throw new RuntimeException(e);
+ }
- if (log.isInfoEnabled())
- log.info("Events processed [sentMessagesCount=" + msgCnt + ']');
+ if (log.isInfoEnabled())
+ log.info("Events processed [sentMessagesCount=" + msgsSnt.value() + ']');
+ }
return true;
}
/** {@inheritDoc} */
- @Override public void start() {
+ @Override public void start(MetricRegistry mreg) {
try {
producer = new KafkaProducer<>(kafkaProps);
@@ -196,6 +222,10 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
catch (Exception e) {
throw new RuntimeException(e);
}
+
+ this.msgsSnt = mreg.longMetric(EVTS_CNT, EVTS_CNT_DESC);
+ this.lastMsgTs = mreg.longMetric(LAST_EVT_TIME, LAST_EVT_TIME_DESC);
+ this.bytesSnt = mreg.longMetric(BYTES_SENT, BYTES_SENT_DESCRIPTION);
}
/** {@inheritDoc} */
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
index 7e46dc3..6db6eb3 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/AbstractReplicationTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.cdc;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -26,7 +27,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.function.Function;
import java.util.stream.IntStream;
+import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.query.SqlFieldsQuery;
@@ -38,11 +41,15 @@ import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.spi.metric.LongMetric;
+import org.apache.ignite.spi.metric.ObjectMetric;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -53,7 +60,18 @@ import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
import static org.apache.ignite.cache.CacheMode.PARTITIONED;
import static org.apache.ignite.cache.CacheMode.REPLICATED;
import static org.apache.ignite.cluster.ClusterState.ACTIVE;
+import static org.apache.ignite.internal.cdc.CdcMain.BINARY_META_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.CDC_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.COMMITTED_SEG_OFFSET;
+import static org.apache.ignite.internal.cdc.CdcMain.CUR_SEG_IDX;
+import static org.apache.ignite.internal.cdc.CdcMain.LAST_SEG_CONSUMPTION_TIME;
+import static org.apache.ignite.internal.cdc.CdcMain.MARSHALLER_DIR;
+import static org.apache.ignite.internal.cdc.CdcMain.cdcInstanceName;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.EVTS_CNT;
+import static org.apache.ignite.internal.cdc.WalRecordsConsumer.LAST_EVT_TIME;
import static org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi.DFLT_PORT_RANGE;
+import static org.apache.ignite.testframework.GridTestUtils.getFieldValue;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
@@ -137,6 +155,9 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** */
private byte clusterId = SRC_CLUSTER_ID;
+ /** */
+ protected final List<CdcMain> cdcs = Collections.synchronizedList(new ArrayList<>());
+
/** {@inheritDoc} */
@Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception {
IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName)
@@ -179,6 +200,8 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
@Override protected void beforeTest() throws Exception {
cleanPersistenceDir();
+ cdcs.clear();
+
IgniteBiTuple<IgniteEx[], IgniteConfiguration[]> cluster = setupCluster("source", "src-cluster-client", 0);
srcCluster = cluster.get1();
@@ -249,10 +272,14 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.EXISTS, futs);
+ checkMetrics();
+
IntStream.range(0, KEYS_CNT).forEach(srcCache::remove);
waitForSameData(srcCache, destCache, KEYS_CNT, WaitDataMode.REMOVED, futs);
+ checkMetrics();
+
assertFalse(destCluster[0].cacheNames().contains(IGNORED_CACHE));
}
finally {
@@ -287,6 +314,7 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
assertTrue(waitForCondition(waitForTblSz.apply(KEYS_CNT), getTestTimeout()));
+ checkMetrics();
List<List<?>> data = executeSql(destCluster[0], "SELECT ID, NAME FROM T1 ORDER BY ID");
@@ -298,6 +326,8 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
executeSql(srcCluster[0], deleteQry);
assertTrue(waitForCondition(waitForTblSz.apply(0), getTestTimeout()));
+
+ checkMetrics();
}
finally {
for (IgniteInternalFuture<?> fut : futs)
@@ -408,4 +438,57 @@ public abstract class AbstractReplicationTest extends GridCommonAbstractTest {
/** */
protected abstract List<IgniteInternalFuture<?>> startActiveActiveCdc();
+
+ /** */
+ protected abstract void checkConsumerMetrics(Function<String, Long> longMetric);
+
+ /** */
+ protected void checkMetrics() {
+ for (int i = 0; i < cdcs.size(); i++) {
+ IgniteConfiguration cfg = getFieldValue(cdcs.get(i), "igniteCfg");
+
+ MetricRegistry mreg = getFieldValue(cdcs.get(i), "mreg");
+
+ assertNotNull(mreg);
+
+ checkMetrics(
+ m -> mreg.<LongMetric>findMetric(m).value(),
+ m -> mreg.<ObjectMetric<String>>findMetric(m).value()
+ );
+
+ Function<DynamicMBean, Function<String, ?>> jmxVal = mxBean -> m -> {
+ try {
+ return mxBean.getAttribute(m);
+ }
+ catch (Exception e) {
+ throw new IgniteException(e);
+ }
+ };
+
+ DynamicMBean jmxCdcReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), null, "cdc");
+
+ checkMetrics((Function<String, Long>)jmxVal.apply(jmxCdcReg), (Function<String, String>)jmxVal.apply(jmxCdcReg));
+
+ DynamicMBean jmxConsumerReg = metricRegistry(cdcInstanceName(cfg.getIgniteInstanceName()), "cdc", "consumer");
+
+ checkConsumerMetrics((Function<String, Long>)jmxVal.apply(jmxConsumerReg));
+ }
+ }
+
+ /** */
+ private void checkMetrics(Function<String, Long> longMetric, Function<String, String> strMetric) {
+ long committedSegIdx = longMetric.apply(COMMITTED_SEG_IDX);
+ long curSegIdx = longMetric.apply(CUR_SEG_IDX);
+
+ assertTrue(committedSegIdx <= curSegIdx);
+
+ assertTrue(longMetric.apply(COMMITTED_SEG_OFFSET) >= 0);
+ assertTrue(longMetric.apply(LAST_SEG_CONSUMPTION_TIME) > 0);
+
+ for (String m : new String[] {BINARY_META_DIR, MARSHALLER_DIR, CDC_DIR})
+ assertTrue(new File(strMetric.apply(m)).exists());
+
+ assertNotNull(longMetric.apply(LAST_EVT_TIME));
+ assertNotNull(longMetric.apply(EVTS_CNT));
+ }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
index 6f87f9d..5a9b5b8 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/CdcIgniteToIgniteReplicationTest.java
@@ -20,9 +20,11 @@ package org.apache.ignite.cdc;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.function.Function;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import static org.apache.ignite.testframework.GridTestUtils.runAsync;
@@ -51,6 +53,12 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
return futs;
}
+ /** {@inheritDoc} */
+ @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+ }
+
/**
* @param srcCfg Ignite source node configuration.
* @param destCfg Ignite destination cluster configuration.
@@ -62,8 +70,13 @@ public class CdcIgniteToIgniteReplicationTest extends AbstractReplicationTest {
CdcConfiguration cdcCfg = new CdcConfiguration();
cdcCfg.setConsumer(new IgniteToIgniteCdcStreamer(destCfg, false, Collections.singleton(cache), KEYS_CNT));
+ cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+ CdcMain cdc = new CdcMain(srcCfg, null, cdcCfg);
+
+ cdcs.add(cdc);
- new CdcMain(srcCfg, null, cdcCfg).run();
+ cdc.run();
});
}
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
index 459c229..bf51d4f 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationAppsTest.java
@@ -135,6 +135,11 @@ public class CdcKafkaReplicationAppsTest extends CdcKafkaReplicationTest {
);
}
+ /** {@inheritDoc} */
+ @Override protected void checkMetrics() {
+ // Skip metrics check.
+ }
+
/** */
private String prepareConfig(String path, Map<String, String> params) {
try {
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index c492300..ce51a7b 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -22,12 +22,15 @@ import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Set;
+import java.util.function.Function;
import org.apache.ignite.cdc.AbstractReplicationTest;
import org.apache.ignite.cdc.CdcConfiguration;
+import org.apache.ignite.cdc.IgniteToIgniteCdcStreamer;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cdc.CdcMain;
+import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -116,6 +119,13 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
return futs;
}
+ /** {@inheritDoc} */
+ @Override protected void checkConsumerMetrics(Function<String, Long> longMetric) {
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.LAST_EVT_TIME));
+ assertNotNull(longMetric.apply(IgniteToIgniteCdcStreamer.EVTS_CNT));
+ assertNotNull(longMetric.apply(IgniteToKafkaCdcStreamer.BYTES_SENT));
+ }
+
/**
* @param igniteCfg Ignite configuration.
* @param topic Kafka topic name.
@@ -130,8 +140,13 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
CdcConfiguration cdcCfg = new CdcConfiguration();
cdcCfg.setConsumer(cdcCnsmr);
+ cdcCfg.setMetricExporterSpi(new JmxMetricExporterSpi());
+
+ CdcMain cdc = new CdcMain(igniteCfg, null, cdcCfg);
+
+ cdcs.add(cdc);
- new CdcMain(igniteCfg, null, cdcCfg).run();
+ cdc.run();
});
}