You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ni...@apache.org on 2022/05/17 05:03:46 UTC

[ignite-extensions] 01/01: IGNITE-16871 Code review fixes.

This is an automated email from the ASF dual-hosted git repository.

nizhikov pushed a commit to branch IGNITE-16757
in repository https://gitbox.apache.org/repos/asf/ignite-extensions.git

commit e487a1c8d1b828ca6b97bb0dc45969c28d39e9ea
Author: Nikolay Izhikov <ni...@apache.org>
AuthorDate: Tue May 17 08:03:37 2022 +0300

    IGNITE-16871 Code review fixes.
---
 .../ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java | 69 ++++++++++++----------
 .../cdc/kafka/KafkaToIgniteCdcStreamerApplier.java |  3 +-
 2 files changed, 39 insertions(+), 33 deletions(-)

diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
index fee991b..d821e2e 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java
@@ -184,61 +184,66 @@ public class IgniteToKafkaCdcStreamer implements CdcConsumer {
             return true;
         });
 
-        while (filtered.hasNext()) {
-            sendLimited(
-                filtered,
-                evt -> new ProducerRecord<>(
-                    evtTopic,
-                    evt.partition() % kafkaParts,
-                    evt.cacheId(),
-                    IgniteUtils.toBytes(evt)
-                ),
-                evtsCnt
-            );
-        }
+        sendAll(
+            filtered,
+            evt -> new ProducerRecord<>(
+                evtTopic,
+                evt.partition() % kafkaParts,
+                evt.cacheId(),
+                IgniteUtils.toBytes(evt)
+            ),
+            evtsCnt
+        );
 
         return true;
     }
 
     /** {@inheritDoc} */
     @Override public void onTypes(Iterator<BinaryType> types) {
-        while (types.hasNext()) {
-            sendLimited(
-                types,
-                t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
-                typesCnt
-            );
-        }
+        sendAll(
+            types,
+            t -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(((BinaryTypeImpl)t).metadata())),
+            typesCnt
+        );
 
         sendMetaUpdatedMarkers();
     }
 
     /** {@inheritDoc} */
     @Override public void onMappings(Iterator<TypeMapping> mappings) {
-        while (mappings.hasNext()) {
-            sendLimited(
-                mappings,
-                m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
-                mappingsCnt
-            );
-        }
+        sendAll(
+            mappings,
+            m -> new ProducerRecord<>(metadataTopic, IgniteUtils.toBytes(m)),
+            mappingsCnt
+        );
 
         sendMetaUpdatedMarkers();
     }
 
     /** Send marker(meta need to be updated) record to each partition of events topic. */
     private void sendMetaUpdatedMarkers() {
-        Iterator<Integer> parts = IntStream.range(0, kafkaParts).iterator();
-
-        while (parts.hasNext())
-            sendLimited(parts, p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER), evtsCnt);
+        sendAll(
+            IntStream.range(0, kafkaParts).iterator(),
+            p -> new ProducerRecord<>(evtTopic, p, null, META_UPDATE_MARKER),
+            evtsCnt
+        );
 
         if (log.isDebugEnabled())
             log.debug("Meta update markers sent.");
     }
 
-    /** Send limited amount of data to Kafka. */
-    private <T> void sendLimited(
+    /** Send all data to Kafka. */
+    private <T> void sendAll(
+        Iterator<T> data,
+        Function<T, ProducerRecord<Integer, byte[]>> toRec,
+        AtomicLongMetric cntr
+    ) {
+        while (data.hasNext())
+            sendOneBatch(data, toRec, cntr);
+    }
+
+    /** Send one batch. */
+    private <T> void sendOneBatch(
         Iterator<T> data,
         Function<T, ProducerRecord<Integer, byte[]>> toRec,
         AtomicLongMetric cntr
diff --git a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
index 3ed2650..acb2aeb 100644
--- a/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
+++ b/modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/KafkaToIgniteCdcStreamerApplier.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -236,7 +237,7 @@ class KafkaToIgniteCdcStreamerApplier extends CdcEventsApplier implements Runnab
     private boolean filterAndPossiblyUpdateMetadata(ConsumerRecord<Integer, byte[]> rec) {
         byte[] val = rec.value();
 
-        if (val.length == META_UPDATE_MARKER.length && U.bytesEqual(val, 0, META_UPDATE_MARKER, 0, val.length)) {
+        if (rec.key() == null && Arrays.equals(val, META_UPDATE_MARKER)) {
             metaUpdr.updateMetadata();
 
             return false;