You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2023/03/22 10:28:50 UTC

[ignite-extensions] branch master updated: IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a008cc8 IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)
9a008cc8 is described below

commit 9a008cc88443d09c1daf70077571abc9d774c916
Author: Ilya Shishkov <sh...@gmail.com>
AuthorDate: Wed Mar 22 13:28:44 2023 +0300

    IGNITE-18209 Add end offsets check in KafkaToIgniteMetadataUpdater (#202)
---
 .../cdc/kafka/KafkaToIgniteMetadataUpdater.java    |  40 ++++-
 .../ignite/cdc/kafka/CdcKafkaReplicationTest.java  |  71 ++++++---
 .../kafka/KafkaToIgniteMetadataUpdaterTest.java    | 171 +++++++++++++++++++++
 3 files changed, 260 insertions(+), 22 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
index 581e74c0..15e793aa 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdater.java
@@ -19,17 +19,23 @@ package org.apache.ignite.cdc.kafka;
 
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.cdc.TypeMapping;
 import org.apache.ignite.internal.binary.BinaryContext;
 import org.apache.ignite.internal.binary.BinaryMetadata;
 import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 import org.apache.kafka.common.serialization.VoidDeserializer;
 
@@ -56,6 +62,12 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
     /** */
     private final AtomicLong rcvdEvts = new AtomicLong();
 
+    /** Offsets from the last successful metadata update. */
+    private Map<TopicPartition, Long> offsets;
+
+    /** Metadata topic partitions. */
+    private final Set<TopicPartition> parts;
+
     /**
      * @param ctx Binary context.
      * @param log Logger.
@@ -83,11 +95,37 @@ public class KafkaToIgniteMetadataUpdater implements AutoCloseable {
 
         cnsmr = new KafkaConsumer<>(kafkaProps);
 
-        cnsmr.subscribe(Collections.singletonList(streamerCfg.getMetadataTopic()));
+        String metaTopic = streamerCfg.getMetadataTopic();
+
+        parts = cnsmr.partitionsFor(metaTopic, Duration.ofMillis(kafkaReqTimeout))
+            .stream()
+            .map(pInfo -> new TopicPartition(metaTopic, pInfo.partition()))
+            .collect(Collectors.toSet());
+
+        if (parts.size() != 1) {
+            this.log.warning("Metadata topic '" + metaTopic + "' has " + parts.size() + " partitions. " +
+                "In order to read data with guaranteed order set number of partitions to 1");
+        }
+
+        cnsmr.subscribe(Collections.singletonList(metaTopic));
     }
 
     /** Polls all available records from metadata topic and applies it to Ignite. */
     public synchronized void updateMetadata() {
+        // If there are no new records in topic, method KafkaConsumer#poll blocks up to the specified timeout.
+        // In order to eliminate this, we compare current offsets with the offsets from the last metadata update
+        // (stored in 'offsets' field). If there are no offsets changes, polling cycle is skipped.
+        Map<TopicPartition, Long> offsets0 = cnsmr.endOffsets(parts, Duration.ofMillis(kafkaReqTimeout));
+
+        if (!F.isEmpty(offsets0) && F.eqNotOrdered(offsets, offsets0)) {
+            if (log.isDebugEnabled())
+                log.debug("Offsets unchanged, poll skipped");
+
+            return;
+        }
+
+        offsets = new HashMap<>(offsets0);
+
         while (true) {
             ConsumerRecords<Void, byte[]> recs = cnsmr.poll(Duration.ofMillis(kafkaReqTimeout));
 
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
index cbc2f940..403abdac 100644
--- a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/CdcKafkaReplicationTest.java
@@ -29,6 +29,7 @@ import org.apache.ignite.configuration.ClientConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.cdc.CdcMain;
 import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -64,32 +65,14 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
     @Override protected void beforeTest() throws Exception {
         super.beforeTest();
 
-        if (KAFKA == null) {
-            KAFKA = new EmbeddedKafkaCluster(1);
-
-            KAFKA.start();
-        }
-
-        KAFKA.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
-        KAFKA.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
-        KAFKA.createTopic(SRC_DEST_META_TOPIC, 1, 1);
-        KAFKA.createTopic(DEST_SRC_META_TOPIC, 1, 1);
+        KAFKA = initKafka(KAFKA);
     }
 
     /** {@inheritDoc} */
     @Override protected void afterTest() throws Exception {
         super.afterTest();
 
-        KAFKA.getAllTopicsInCluster().forEach(t -> {
-            try {
-                KAFKA.deleteTopic(t);
-            }
-            catch (InterruptedException e) {
-                throw new RuntimeException(e);
-            }
-        });
-
-        waitForCondition(() -> KAFKA.getAllTopicsInCluster().isEmpty(), getTestTimeout());
+        removeKafkaTopicsAndWait(KAFKA, getTestTimeout());
     }
 
     /** {@inheritDoc} */
@@ -239,9 +222,16 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
 
     /** */
     protected Properties kafkaProperties() {
+        return kafkaProperties(KAFKA);
+    }
+
+    /**
+     * @param kafka Kafka cluster.
+     */
+    static Properties kafkaProperties(EmbeddedKafkaCluster kafka) {
         Properties props = new Properties();
 
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA.bootstrapServers());
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers());
         props.put(ConsumerConfig.GROUP_ID_CONFIG, "kafka-to-ignite-applier");
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
@@ -249,4 +239,43 @@ public class CdcKafkaReplicationTest extends AbstractReplicationTest {
 
         return props;
     }
+
+    /**
+     * Init Kafka cluster if current instance is null and create topics.
+     *
+     * @param curKafka Current kafka.
+     */
+    static EmbeddedKafkaCluster initKafka(EmbeddedKafkaCluster curKafka) throws Exception {
+        EmbeddedKafkaCluster kafka = curKafka;
+
+        if (kafka == null) {
+            kafka = new EmbeddedKafkaCluster(1);
+
+            kafka.start();
+        }
+
+        kafka.createTopic(SRC_DEST_TOPIC, DFLT_PARTS, 1);
+        kafka.createTopic(DEST_SRC_TOPIC, DFLT_PARTS, 1);
+        kafka.createTopic(SRC_DEST_META_TOPIC, 1, 1);
+        kafka.createTopic(DEST_SRC_META_TOPIC, 1, 1);
+
+        return kafka;
+    }
+
+    /**
+     * @param kafka Kafka cluster.
+     * @param timeout Timeout.
+     */
+    static void removeKafkaTopicsAndWait(EmbeddedKafkaCluster kafka, long timeout) throws IgniteInterruptedCheckedException {
+        kafka.getAllTopicsInCluster().forEach(t -> {
+            try {
+                kafka.deleteTopic(t);
+            }
+            catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+
+        waitForCondition(() -> kafka.getAllTopicsInCluster().isEmpty(), timeout);
+    }
 }
diff --git a/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
new file mode 100644
index 00000000..fee428b9
--- /dev/null
+++ b/modules/cdc-ext/src/test/java/org/apache/ignite/cdc/kafka/KafkaToIgniteMetadataUpdaterTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cdc.kafka;
+
+import java.util.Collections;
+import org.apache.ignite.cdc.CdcConsumer;
+import org.apache.ignite.cdc.TypeMapping;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.binary.BinaryContext;
+import org.apache.ignite.internal.binary.BinaryNoopMetadataHandler;
+import org.apache.ignite.internal.cdc.TypeMappingImpl;
+import org.apache.ignite.internal.processors.metric.MetricRegistry;
+import org.apache.ignite.platform.PlatformType;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.junit.Test;
+
+import static org.apache.ignite.cdc.AbstractReplicationTest.ACTIVE_PASSIVE_CACHE;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.DFLT_PARTS;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_META_TOPIC;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.SRC_DEST_TOPIC;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.initKafka;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.kafkaProperties;
+import static org.apache.ignite.cdc.kafka.CdcKafkaReplicationTest.removeKafkaTopicsAndWait;
+import static org.apache.ignite.cdc.kafka.KafkaToIgniteCdcStreamerConfiguration.DFLT_KAFKA_REQ_TIMEOUT;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+import static org.apache.logging.log4j.Level.DEBUG;
+
+/**
+ *
+ */
+public class KafkaToIgniteMetadataUpdaterTest extends GridCommonAbstractTest {
+    /** Markers sent messages listener. */
+    private static final LogListener MARKERS_LISTENER = LogListener.matches("Meta update markers sent.")
+        .times(1)
+        .build();
+
+    /** Polled from meta topic message listener. */
+    private static final LogListener POLLED_LISTENER = LogListener.matches("Polled from meta topic [rcvdEvts=1]")
+        .times(1)
+        .build();
+
+    /** Poll skip messages listener. */
+    private static final LogListener POLL_SKIP_LISTENER = LogListener.matches("Offsets unchanged, poll skipped")
+        .times(1)
+        .build();
+
+    /** Kafka cluster. */
+    private EmbeddedKafkaCluster kafka;
+
+    /** Listening logger. */
+    private ListeningTestLogger listeningLog;
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        kafka = initKafka(kafka);
+
+        listeningLog = new ListeningTestLogger(log);
+
+        resetLog4j(DEBUG, false, IgniteToKafkaCdcStreamer.class.getName());
+        resetLog4j(DEBUG, false, KafkaToIgniteMetadataUpdater.class.getName());
+
+        listeningLog.registerListener(MARKERS_LISTENER);
+        listeningLog.registerListener(POLLED_LISTENER);
+        listeningLog.registerListener(POLL_SKIP_LISTENER);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        removeKafkaTopicsAndWait(kafka, getTestTimeout());
+
+        MARKERS_LISTENER.reset();
+        POLLED_LISTENER.reset();
+        POLL_SKIP_LISTENER.reset();
+    }
+
+    /** */
+    @Test
+    public void testUpdateMetadata() throws Exception {
+        try (KafkaToIgniteMetadataUpdater updater = metadataUpdater()) {
+            CdcConsumer cdcCnsmr = igniteToKafkaCdcStreamer();
+
+            TypeMapping typeMapping = new TypeMappingImpl(1, "test", PlatformType.JAVA);
+
+            // No messages expected.
+            assertFalse(MARKERS_LISTENER.check());
+            assertFalse(POLLED_LISTENER.check());
+            assertFalse(POLL_SKIP_LISTENER.check());
+
+            // Send metadata (type mapping).
+            cdcCnsmr.onMappings(Collections.singleton(typeMapping).iterator());
+
+            // Sent markers expected.
+            assertTrue(waitForCondition(MARKERS_LISTENER::check, getTestTimeout()));
+            assertFalse(POLLED_LISTENER.check());
+            assertFalse(POLL_SKIP_LISTENER.check());
+
+            // Polled from meta topic expected.
+            updater.updateMetadata();
+            assertTrue(POLLED_LISTENER.check());
+            assertFalse(POLL_SKIP_LISTENER.check());
+
+            // Poll skip expected.
+            updater.updateMetadata();
+            assertTrue(POLL_SKIP_LISTENER.check());
+        }
+    }
+
+    /** */
+    private IgniteToKafkaCdcStreamer igniteToKafkaCdcStreamer() {
+        IgniteToKafkaCdcStreamer streamer = new IgniteToKafkaCdcStreamer()
+            .setTopic(SRC_DEST_TOPIC)
+            .setMetadataTopic(SRC_DEST_META_TOPIC)
+            .setKafkaPartitions(DFLT_PARTS)
+            .setKafkaProperties(kafkaProperties(kafka))
+            .setCaches(Collections.singleton(ACTIVE_PASSIVE_CACHE))
+            .setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+
+        GridTestUtils.setFieldValue(streamer, "log", listeningLog.getLogger(IgniteToKafkaCdcStreamer.class));
+
+        streamer.start(new MetricRegistry("test", null, null, log));
+
+        return streamer;
+    }
+
+    /** */
+    private KafkaToIgniteMetadataUpdater metadataUpdater() {
+        BinaryContext noOpCtx = new BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(), log) {
+            @Override public boolean registerUserClassName(int typeId, String clsName, boolean failIfUnregistered,
+                boolean onlyLocReg, byte platformId) {
+                return true;
+            }
+        };
+
+        return new KafkaToIgniteMetadataUpdater(noOpCtx, listeningLog, kafkaProperties(kafka), streamerConfiguration());
+    }
+
+    /** */
+    private KafkaToIgniteCdcStreamerConfiguration streamerConfiguration() {
+        KafkaToIgniteCdcStreamerConfiguration cfg = new KafkaToIgniteCdcStreamerConfiguration();
+        cfg.setTopic(SRC_DEST_TOPIC);
+        cfg.setMetadataTopic(SRC_DEST_META_TOPIC);
+        cfg.setKafkaPartsFrom(0);
+        cfg.setKafkaPartsTo(DFLT_PARTS);
+        cfg.setKafkaRequestTimeout(DFLT_KAFKA_REQ_TIMEOUT);
+
+        return cfg;
+    }
+}