You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/03/22 10:28:50 UTC
[ignite-extensions] branch master updated: IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)
This is an automated email from the ASF dual-hosted git repository.
nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git
The following commit(s) were added to refs/heads/master by this push:
new 9a008cc8 IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)
9a008cc8 is described below
commit 9a008cc88443d09c1daf70077571abc9d774c916
Author: Ilya Shishkov <sh...@gmail.com>
AuthorDate: Wed Mar 22 13:28:44 2023 +0300
IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)
---
.../cdc/kafka/KafkaToIgniteMetadataUpdater.java | 40 ++++-
.../ignite/cdc/kafka/CdcKafkaReplicationTest.java | 71 ++++++---
.../kafka/KafkaToIgniteMetadataUpdaterTest.java | 171 +++++++++++++++++++++
3 files changed, 260 insertions(+), 22 deletions(-)
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 581e74c0..15e793aa 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -19,17 +19,23 @@ package org.apache.ignite.cdc.kafka;
import java.time.Duration;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.internal.binary.BinaryContext;
import org.apache.ignite.internal.binary.BinaryMetadata;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.VoidDeserializer;
@@ -56,6 +62,12 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
/** */
private final AtomicLong rcvdEvts = new AtomicLong();
+ /** Offsets from the last successful metadata update. */
+ private Map<TopicPartition, Long> offsets;
+
+ /** Metadata topic partitions. */
+ private final Set<TopicPartition> parts;
+
/**
* @param ctx Binary context.
* @param log Logger.
@@ -83,11 +95,37 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
cnsmr = new KafkaConsumer<>(kafkaProps);
- cnsmr.subscribe(Collections.singletonList(streamerCfg.getMetadataTopic()));
+ String metaTopic = streamerCfg.getMetadataTopic();
+
+ parts = cnsmr.partitionsFor(metaTopic, Duration.ofMillis(kafkaReqTimeout))
+ .stream()
+ .map(pInfo -> new TopicPartition(metaTopic, pInfo.partition()))
+ .collect(Collectors.toSet());
+
+ if (parts.size() != 1) {
+ this.log.warning("Metadata topic '" + metaTopic + "' has " + parts.size() + " partitions. " +
+ "In order to read data with guaranteed order set number of partitions to 1");
+ }
+
+ cnsmr.subscribe(Collections.singletonList(metaTopic));
}
/** Polls all available records from metadata topic and applies it to Ignite. */
public synchronized void updateMetadata() {
+ // If there are no new records in topic, method KafkaConsumer#poll blocks up to the specified timeout.
+ // In order to eliminate this, we compare current offsets with the offsets from the last metadata update
+ // (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped.
+ Map<TopicPartition, Long> offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout));
+
+ if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) {
+ if (log.isDebugEnabled())
+ log.debug("Offsets unchanged, poll skipped");
+
+ return;
+ }
+
+ offsets = new HashMap<>(offsets0);
+
while (true) {
ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index cbc2f940..403abdac 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -64,32 +65,14 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
@Override protected void beforeTest() throws Exception {
super.beforeTest();
- if (KAFKA == null) {
- KAFKA = new EmbeddedKafkaCluster(1);
-
- KAFKA.start();
- }
-
- KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
- KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
- KAFKA.createTopic(SRC_DEST_META_TOPIC, 1, 1);
- KAFKA.createTopic(DEST_SRC_META_TOPIC, 1, 1);
+ KAFKA = initKafka(KAFKA);
}
/** {@inheritDoc} */
@Override protected void afterTest() throws Exception {
super.afterTest();
- KAFKA.getAllTopicsInCluster().forEach(t -> {
- try {
- KAFKA.deleteTopic(t);
- }
- catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- });
-
- waitForCondition(() -> KAFKA.getAllTopicsInCluster().isEmpty(), getTestTimeout());
+ removeKafkaTopicsAndWait(KAFKA, getTestTimeout());
}
/** {@inheritDoc} */
@@ -239,9 +222,16 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
/** */
protected Properties kafkaProperties() {
+ return kafkaProperties(KAFKA);
+ }
+
+ /**
+ * @param kafka Kafka cluster.
+ */
+ static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
Properties props = new Properties();
- props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -249,4 +239,43 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
return props;
}
+
+ /**
+ * Init Kafka cluster if current instance is null and create topics.
+ *
+ * @param curKafka Current kafka.
+ */
+ static EmbeddedKafkaCluster initKafka(EmbeddedKafkaCluster curKafka) throws Exception {
+ EmbeddedKafkaCluster kafka = curKafka;
+
+ if (kafka == null) {
+ kafka = new EmbeddedKafkaCluster(1);
+
+ kafka.start();
+ }
+
+ kafka.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
+ kafka.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
+ kafka.createTopic(SRC_DEST_META_TOPIC, 1, 1);
+ kafka.createTopic(DEST_SRC_META_TOPIC, 1, 1);
+
+ return kafka;
+ }
+
+ /**
+ * @param kafka Kafka cluster.
+ * @param timeout Timeout.
+ */
+ static void removeKafkaTopicsAndWait(EmbeddedKafkaCluster kafka, long timeout) throws IgniteInterruptedCheckedException {
+ kafka.getAllTopicsInCluster().forEach(t -> {
+ try {
+ kafka.deleteTopic(t);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ waitForCondition(() -> kafka.getAllTopicsInCluster().isEmpty(), timeout);
+ }
}
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
new file mode 100644
index 00000000..fee428b9
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collections;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
+import org.apache.ignite.internal.cdc.TypeMappingImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.platform.PlatformType;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.AbstractReplicationTest.ACTIVE_PASSIVE_CACHE;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_META_TOPIC;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.logging.log4j.Level.DEBUG;
+
+/**
+ *
+ */
+public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
+ /** Markers sent messages listener. */
+ private static final LogListener MARKERS_LISTENER = LogListener.matches("Meta update markers sent.")
+ .times(1)
+ .build();
+
+ /** Polled from meta topic message listener. */
+ private static final LogListener POLLED_LISTENER = LogListener.matches("Polled from meta topic [rcvdEvts=1]")
+ .times(1)
+ .build();
+
+ /** Poll skip messages listener. */
+ private static final LogListener POLL_SKIP_LISTENER = LogListener.matches("Offsets unchanged, poll skipped")
+ .times(1)
+ .build();
+
+ /** Kafka cluster. */
+ private EmbeddedKafkaCluster kafka;
+
+ /** Listening logger. */
+ private ListeningTestLogger listeningLog;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ kafka = initKafka(kafka);
+
+ listeningLog = new ListeningTestLogger(log);
+
+ resetLog4j(DEBUG, false, IgniteToKafkaCdcStreamer.class.getName());
+ resetLog4j(DEBUG, false, KafkaToIgniteMetadataUpdater.class.getName());
+
+ listeningLog.registerListener(MARKERS_LISTENER);
+ listeningLog.registerListener(POLLED_LISTENER);
+ listeningLog.registerListener(POLL_SKIP_LISTENER);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ removeKafkaTopicsAndWait(kafka, getTestTimeout());
+
+ MARKERS_LISTENER.reset();
+ POLLED_LISTENER.reset();
+ POLL_SKIP_LISTENER.reset();
+ }
+
+ /** */
+ @Test
+ public void testUpdateMetadata() throws Exception {
+ try (KafkaToIgniteMetadataUpdater updater = metadataUpdater()) {
+ CdcConsumer cdcCnsmr = igniteToKafkaCdcStreamer();
+
+ TypeMapping typeMapping = new TypeMappingImpl(1, "test", PlatformType.JAVA);
+
+ // No messages expected.
+ assertFalse(MARKERS_LISTENER.check());
+ assertFalse(POLLED_LISTENER.check());
+ assertFalse(POLL_SKIP_LISTENER.check());
+
+ // Send metadata (type mapping).
+ cdcCnsmr.onMappings(Collections.singleton(typeMapping).iterator());
+
+ // Sent markers expected.
+ assertTrue(waitForCondition(MARKERS_LISTENER::check, getTestTimeout()));
+ assertFalse(POLLED_LISTENER.check());
+ assertFalse(POLL_SKIP_LISTENER.check());
+
+ // Polled from meta topic expected.
+ updater.updateMetadata();
+ assertTrue(POLLED_LISTENER.check());
+ assertFalse(POLL_SKIP_LISTENER.check());
+
+ // Poll skip expected.
+ updater.updateMetadata();
+ assertTrue(POLL_SKIP_LISTENER.check());
+ }
+ }
+
+ /** */
+ private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() {
+ IgniteToKafkaCdcStreamer streamer = new IgniteToKafkaCdcStreamer()
+ .setTopic(SRC_DEST_TOPIC)
+ .setMetadataTopic(SRC_DEST_META_TOPIC)
+ .setKafkaPartitions(DFLT_PARTS)
+ .setKafkaProperties(kafkaProperties(kafka))
+ .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
+ .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+
+ GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class));
+
+ streamer.start(new MetricRegistry("test", null, null, log));
+
+ return streamer;
+ }
+
+ /** */
+ private KafkaToIgniteMetadataUpdater metadataUpdater() {
+ BinaryContext noOpCtx = new BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(), log) {
+ @Override public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered,
+ boolean onlyLocReg, byte platformId) {
+ return true;
+ }
+ };
+
+ return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(kafka), streamerConfiguration());
+ }
+
+ /** */
+ private KafkaToIgniteCdcStreamerConfiguration streamerConfiguration() {
+ KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();
+ cfg.setTopic(SRC_DEST_TOPIC);
+ cfg.setMetadataTopic(SRC_DEST_META_TOPIC);
+ cfg.setKafkaPartsFrom(0);
+ cfg.setKafkaPartsTo(DFLT_PARTS);
+ cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+
+ return cfg;
+ }
+}